diff --git a/README.md b/README.md index a9b1751..4e69aa5 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,18 @@ The application reads configurations from the `.env` file at the root. | `PRICE_ANOMALY_THRESHOLD` | Anomaly detection threshold % | 10 | No | | `ADMIN_API_KEY` | Bootstrap admin bearer token for API key management | undefined | Yes, for protected endpoints | | `LOG_LEVEL` | Logging level | info | No | + +| `WEBHOOK_MAX_ATTEMPTS` | Total delivery attempts (initial + retries) | 3 | No | +| `WEBHOOK_RETRY_BASE_MS` | Base backoff between retries (ms) | 30000 | No | +| `WEBHOOK_RETRY_FACTOR` | Exponential backoff multiplier | 2 | No | +| `WEBHOOK_TIMEOUT_MS` | HTTP timeout per delivery attempt | 5000 | No | +| `WEBHOOK_RETRY_POLL_MS` | Retry worker poll interval | 5000 | No | +| `WEBHOOK_RETRY_BATCH` | Max retries processed per tick | 25 | No | +| `WEBHOOK_RATELIMIT_WINDOW` | Mgmt rate-limit window (s) | 60 | No | +| `WEBHOOK_RATELIMIT_MAX` | Mgmt rate-limit max requests / window / IP | 60 | No | +| `WEBHOOK_TEST_RATELIMIT_WINDOW` | Test endpoint rate-limit window (s) | 60 | No | +| `WEBHOOK_TEST_RATELIMIT_MAX` | Test endpoint rate-limit max / window / IP | 5 | No | + | `CORS_ALLOWED_ORIGINS` | Allowed origins split by commas | http://localhost:4000,http://localhost:3001 | No | |----------|-------------|---------|----------| | `NODE_ENV` | Runtime environment: `development`, `test`, or `production` | development | No | @@ -124,6 +136,7 @@ The application reads configurations from the `.env` file at the root. | `ADMIN_API_KEY` | Bootstrap admin bearer token for API key management | empty | Yes, for protected endpoints | | `LOG_LEVEL` | Logging level: `debug`, `info`, `warn`, or `error` | info | No | + --- ## API Endpoints @@ -244,8 +257,123 @@ curl http://localhost:4000/health ``` + +## Webhooks + +Register endpoints that receive HTTP POST callbacks when SmartDrop indexes farming/pool events. + +### Supported event types + +| Event | Description | +|-------|-------------| +| `pool.created` | A new farming pool was created on-chain | +| `pool.assets_locked` | Assets were locked into a pool | +| `pool.assets_unlocked` | Assets were unlocked from a pool | +| `pool.rewards_distributed` | Pool distributed rewards to participants | +| `pool.closed` | Pool was closed | +| `price.alert` | Existing price-alert event | +| `*` | Wildcard — subscribe to every known event | + +### API + +#### Register a webhook +``` +POST /api/v1/webhooks +Content-Type: application/json + +{ + "url": "https://example.com/webhooks/smartdrop", + "events": ["pool.assets_locked", "pool.rewards_distributed"], + "secret": "whsec_at_least_16_chars", // optional, generated if omitted + "description": "Production webhook" // optional +} +``` + +The response includes the secret in plaintext **exactly once**. Subsequent reads only return `secret_preview`. + +#### Manage webhooks +``` +GET /api/v1/webhooks # list +GET /api/v1/webhooks/:id # fetch one +PATCH /api/v1/webhooks/:id # update url / events / active / description +DELETE /api/v1/webhooks/:id # remove +``` + +#### Test endpoint +``` +POST /api/v1/webhooks/:id/test +``` +Sends a synthetic `pool.assets_locked` payload to the registered URL and returns the resulting delivery summary. Limited to 5 calls/min/IP by default. + +#### Inspect deliveries (admin dashboard feed) +``` +GET /api/v1/webhooks/:id/deliveries?limit=50 +``` +Returns the most recent delivery records: `status` (`success | pending | failed`), `attempts`, `response_status`, `last_error`, `next_retry_at`. + +### Outgoing request shape + +Every delivery is a JSON POST with the following headers: + +| Header | Value | +|--------|-------| +| `Content-Type` | `application/json` | +| `User-Agent` | `SmartDrop-Webhooks/1.0` | +| `X-SmartDrop-Event` | event type (e.g. `pool.assets_locked`) | +| `X-SmartDrop-Delivery` | unique delivery id (`dlv_…`) | +| `X-SmartDrop-Signature` | `sha256=` | + +Body: +```json +{ + "event": "pool.assets_locked", + "event_id": "evt_…", + "occurred_at": "2026-06-25T12:00:00.000Z", + "data": { "...": "event-specific fields" } +} +``` + +### Verifying the signature (Node.js) + +```js +const crypto = require('crypto'); + +function verifySmartDrop(req, secret) { + const provided = req.header('X-SmartDrop-Signature') || ''; + const expected = 'sha256=' + crypto + .createHmac('sha256', secret) + .update(req.rawBody) // verify against the RAW body, not re-stringified JSON + .digest('hex'); + const a = Buffer.from(provided); + const b = Buffer.from(expected); + return a.length === b.length && crypto.timingSafeEqual(a, b); +} +``` + +Express tip: capture the raw body via `express.json({ verify: (req, _res, buf) => { req.rawBody = buf.toString(); } })` so the HMAC matches byte-for-byte. + +### Retry & failure semantics + +- Up to `WEBHOOK_MAX_ATTEMPTS` (default 3) total attempts per event. +- Retries are scheduled in Redis and processed by a background worker, so retries survive process restarts. +- Backoff is exponential: `base * factor^(attempts-1)` (default 30s → 60s → 120s). +- **Retryable**: network errors, HTTP 5xx, 408, 429. +- **Not retried**: HTTP 4xx (except 408/429). These are marked `failed` immediately so a misconfigured consumer cannot be retried into the ground. +- Each delivery is logged in `webhook_deliveries` (Redis-backed today, drop-in PG migration documented in `src/repositories/deliveryRepository.js`). + +### Storage model + +The current implementation stores webhooks and delivery logs in Redis behind a repository abstraction. The repository files document the equivalent PostgreSQL schema verbatim — migrating to PG is a matter of swapping the repository implementation only; no caller code changes. + +### Rate limiting + +- Management endpoints under `/api/v1/webhooks`: 60 req/min/IP (configurable). +- `/test` endpoint: 5 req/min/IP (configurable) — prevents using SmartDrop as an outbound HTTP cannon. +- The limiter fails **open** if Redis is unreachable so a cache outage does not lock you out of management calls. + --- + ## Error Handling The API returns appropriate HTTP status codes: diff --git a/src/config.js b/src/config.js index 14ad113..80f1547 100644 --- a/src/config.js +++ b/src/config.js @@ -82,4 +82,20 @@ module.exports = { .split(',') .map((o) => o.trim()) .filter(Boolean), + webhooks: { + maxAttempts: parseInt(process.env.WEBHOOK_MAX_ATTEMPTS, 10) || 3, + retryBaseMs: parseInt(process.env.WEBHOOK_RETRY_BASE_MS, 10) || 30000, + retryFactor: parseFloat(process.env.WEBHOOK_RETRY_FACTOR) || 2, + timeoutMs: parseInt(process.env.WEBHOOK_TIMEOUT_MS, 10) || 5000, + retryPollMs: parseInt(process.env.WEBHOOK_RETRY_POLL_MS, 10) || 5000, + retryBatchSize: parseInt(process.env.WEBHOOK_RETRY_BATCH, 10) || 25, + rateLimit: { + windowSeconds: parseInt(process.env.WEBHOOK_RATELIMIT_WINDOW, 10) || 60, + max: parseInt(process.env.WEBHOOK_RATELIMIT_MAX, 10) || 60, + }, + testRateLimit: { + windowSeconds: parseInt(process.env.WEBHOOK_TEST_RATELIMIT_WINDOW, 10) || 60, + max: parseInt(process.env.WEBHOOK_TEST_RATELIMIT_MAX, 10) || 5, + }, + }, }; diff --git a/src/index.js b/src/index.js index fbbf102..7be389c 100644 --- a/src/index.js +++ b/src/index.js @@ -9,6 +9,10 @@ const { requestIdMiddleware } = require('./middleware/requestId'); const { requireApiKey } = require('./middleware/auth'); const pricesRouter = require('./routes/prices'); const alertsRouter = require('./routes/alerts'); + +const webhooksRouter = require('./routes/webhooks'); +const webhookRetryWorker = require('./jobs/webhookRetryWorker'); + const keysRouter = require('./routes/keys'); const webhooksRouter = require('./routes/webhooks'); const airdropsRouter = require('./routes/airdrops'); @@ -36,14 +40,41 @@ app.use('/api/v1', keysRouter); app.use('/api/v1/alerts', requireApiKey()); app.use('/api/v1', alertsRouter); app.use('/api/v1', webhooksRouter); + app.use('/api/v1', airdropsRouter); + app.use((err, req, res, _next) => { const status = err.status || 500; if (status >= 500) logger.error('Unhandled error', { error: err.message, stack: err.stack }); res.status(status).json({ error: err.message || 'Internal server error' }); }); + +const server = app.listen(config.port, () => { + logger.info(`SmartDrop backend running on port ${config.port}`); + priceRefreshJob.start(); + webhookRetryWorker.start(); +}); + +process.on('SIGTERM', async () => { + logger.info('SIGTERM received, shutting down'); + priceRefreshJob.stop(); + webhookRetryWorker.stop(); + server.close(); + await cache.disconnect(); + process.exit(0); +}); + +process.on('SIGINT', async () => { + logger.info('SIGINT received, shutting down'); + priceRefreshJob.stop(); + webhookRetryWorker.stop(); + server.close(); + await cache.disconnect(); + process.exit(0); +}); + // 1. Declaramos la variable server aquí afuera usando let (para que tenga alcance global en el archivo) let server; @@ -51,6 +82,7 @@ if (require.main === module) { // 2. Aquí adentro solo la asignamos (quitamos el 'const') let server; + if (require.main === module) { server = app.listen(config.port, () => { logger.info(`SmartDrop backend running on port ${config.port}`); @@ -74,6 +106,10 @@ if (require.main === module) { }); } + + +module.exports = {app, server}; + // 3. Ahora el export funcionará perfectamente, tanto si corre directo como en modo test module.exports = { app, server }; module.exports = app; @@ -83,3 +119,4 @@ module.exports.server = server || { if (callback) callback(); }, }; + diff --git a/src/jobs/webhookRetryWorker.js b/src/jobs/webhookRetryWorker.js new file mode 100644 index 0000000..924381e --- /dev/null +++ b/src/jobs/webhookRetryWorker.js @@ -0,0 +1,48 @@ +'use strict'; + +const config = require('../config'); +const logger = require('../logger'); +const dispatcher = require('../services/webhookDispatcher'); +const deliveryRepo = require('../repositories/deliveryRepository'); + +let timer = null; +let running = false; + +async function tick() { + if (running) return; + running = true; + try { + const ids = await deliveryRepo.popDueRetries(Date.now(), config.webhooks.retryBatchSize); + if (ids.length === 0) return; + logger.info('Processing webhook retries', { count: ids.length }); + for (const id of ids) { + try { + await dispatcher.attempt(id); + } catch (err) { + logger.error('Retry attempt failed', { delivery_id: id, error: err.message }); + } + } + } catch (err) { + logger.error('Webhook retry worker tick failed', { error: err.message }); + } finally { + running = false; + } +} + +function start() { + if (timer) return; + const interval = config.webhooks.retryPollMs; + timer = setInterval(tick, interval); + if (typeof timer.unref === 'function') timer.unref(); + logger.info('Webhook retry worker started', { intervalMs: interval }); +} + +function stop() { + if (timer) { + clearInterval(timer); + timer = null; + logger.info('Webhook retry worker stopped'); + } +} + +module.exports = { start, stop, tick }; diff --git a/src/middleware/rateLimit.js b/src/middleware/rateLimit.js new file mode 100644 index 0000000..f315ed9 --- /dev/null +++ b/src/middleware/rateLimit.js @@ -0,0 +1,50 @@ +'use strict'; + +const cache = require('../services/cache'); +const logger = require('../logger'); + +/** + * Fixed-window rate limiter backed by Redis INCR + EXPIRE. + * Fails open if Redis is unreachable so a cache outage cannot lock out users. + */ +function buildRateLimit({ windowSeconds, max, keyPrefix }) { + if (!Number.isFinite(windowSeconds) || windowSeconds <= 0) { + throw new Error('windowSeconds must be a positive number'); + } + if (!Number.isFinite(max) || max <= 0) { + throw new Error('max must be a positive number'); + } + if (!keyPrefix || typeof keyPrefix !== 'string') { + throw new Error('keyPrefix is required'); + } + + return async function rateLimit(req, res, next) { + const identifier = req.ip || req.connection?.remoteAddress || 'unknown'; + const bucket = Math.floor(Date.now() / 1000 / windowSeconds); + const key = `ratelimit:${keyPrefix}:${identifier}:${bucket}`; + + try { + const redis = cache.getClient(); + const count = await redis.incr(key); + if (count === 1) { + await redis.expire(key, windowSeconds); + } + const remaining = Math.max(0, max - count); + res.setHeader('X-RateLimit-Limit', String(max)); + res.setHeader('X-RateLimit-Remaining', String(remaining)); + res.setHeader('X-RateLimit-Reset', String((bucket + 1) * windowSeconds)); + if (count > max) { + return res.status(429).json({ + error: 'Too many requests', + message: `Rate limit of ${max} requests per ${windowSeconds}s exceeded`, + }); + } + return next(); + } catch (err) { + logger.warn('Rate limit fail-open due to cache error', { error: err.message }); + return next(); + } + }; +} + +module.exports = buildRateLimit; diff --git a/src/repositories/deliveryRepository.js b/src/repositories/deliveryRepository.js new file mode 100644 index 0000000..e897f40 --- /dev/null +++ b/src/repositories/deliveryRepository.js @@ -0,0 +1,114 @@ +'use strict'; + +/** + * Webhook delivery log repository. + * + * Schema mirrors the future PostgreSQL `webhook_deliveries` table: + * + * webhook_deliveries ( + * id text primary key, + * webhook_id text not null references webhooks(id) on delete cascade, + * event_id text not null, + * event_type text not null, + * status text not null, -- pending | success | failed + * attempts int not null default 0, + * last_error text, + * last_attempt_at timestamptz, + * next_retry_at timestamptz, + * response_status int, + * created_at timestamptz not null default now() + * ) + * + * Indexes that would back the queries below: + * (webhook_id, created_at desc) - listing recent deliveries per webhook + * (next_retry_at) - retry worker scan + */ + +const crypto = require('crypto'); +const cache = require('../services/cache'); + +const RETRY_QUEUE_KEY = 'webhooks:retries'; +const RECENT_DELIVERIES_LIMIT = 100; + +function key(id) { + return `webhook_delivery:${id}`; +} + +function indexKey(webhookId) { + return `webhook:${webhookId}:deliveries`; +} + +function generateId() { + return `dlv_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`; +} + +async function create({ webhook_id, event_id, event_type }) { + const id = generateId(); + const now = new Date().toISOString(); + const record = { + id, + webhook_id, + event_id, + event_type, + status: 'pending', + attempts: 0, + last_error: null, + last_attempt_at: null, + next_retry_at: null, + response_status: null, + created_at: now, + }; + + const redis = cache.getClient(); + await cache.set(key(id), record); + await redis.zadd(indexKey(webhook_id), Date.now(), id); + await redis.zremrangebyrank(indexKey(webhook_id), 0, -(RECENT_DELIVERIES_LIMIT + 1)); + return record; +} + +async function findById(id) { + return cache.get(key(id)); +} + +async function update(id, patch) { + const existing = await cache.get(key(id)); + if (!existing) return null; + const next = { ...existing, ...patch, id: existing.id }; + await cache.set(key(id), next); + return next; +} + +async function listByWebhook(webhookId, limit = 50) { + const redis = cache.getClient(); + const ids = await redis.zrevrange(indexKey(webhookId), 0, Math.max(0, limit - 1)); + const records = await Promise.all(ids.map((id) => cache.get(key(id)))); + return records.filter(Boolean); +} + +async function scheduleRetry(deliveryId, nextRetryAtMs) { + const redis = cache.getClient(); + await redis.zadd(RETRY_QUEUE_KEY, nextRetryAtMs, deliveryId); +} + +async function popDueRetries(nowMs, max = 25) { + const redis = cache.getClient(); + const ids = await redis.zrangebyscore(RETRY_QUEUE_KEY, '-inf', nowMs, 'LIMIT', 0, max); + if (ids.length === 0) return []; + await redis.zrem(RETRY_QUEUE_KEY, ...ids); + return ids; +} + +async function cancelRetry(deliveryId) { + const redis = cache.getClient(); + await redis.zrem(RETRY_QUEUE_KEY, deliveryId); +} + +module.exports = { + create, + findById, + update, + listByWebhook, + scheduleRetry, + popDueRetries, + cancelRetry, +}; diff --git a/src/repositories/webhookRepository.js b/src/repositories/webhookRepository.js new file mode 100644 index 0000000..cb031a8 --- /dev/null +++ b/src/repositories/webhookRepository.js @@ -0,0 +1,114 @@ +'use strict'; + +/** + * Webhook repository. + * + * Schema mirrors the future PostgreSQL `webhooks` table so that swapping the + * Redis backing for a real DB only requires re-implementing this module: + * + * webhooks ( + * id text primary key, + * url text not null, + * events text[] not null, + * secret text not null, + * active boolean not null default true, + * description text, + * created_at timestamptz not null default now(), + * updated_at timestamptz not null default now() + * ) + */ + +const crypto = require('crypto'); +const cache = require('../services/cache'); + +const IDS_KEY = 'webhooks:ids'; + +function key(id) { + return `webhook:${id}`; +} + +function generateId() { + return `wh_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`; +} + +function normalize(record) { + if (!record) return null; + return { + id: record.id, + url: record.url, + events: Array.isArray(record.events) ? [...record.events] : [], + secret: record.secret, + active: record.active !== false, + description: record.description || null, + created_at: record.created_at, + updated_at: record.updated_at, + }; +} + +async function create({ url, events, secret, description }) { + const id = generateId(); + const now = new Date().toISOString(); + const record = { + id, + url, + events, + secret, + active: true, + description: description || null, + created_at: now, + updated_at: now, + }; + const redis = cache.getClient(); + await cache.set(key(id), record); + await redis.sadd(IDS_KEY, id); + return normalize(record); +} + +async function findById(id) { + const record = await cache.get(key(id)); + return normalize(record); +} + +async function list() { + const redis = cache.getClient(); + const ids = await redis.smembers(IDS_KEY); + const records = await Promise.all(ids.map((id) => cache.get(key(id)))); + return records.filter(Boolean).map(normalize); +} + +async function listActiveForEvent(eventType, matcher) { + const all = await list(); + return all.filter((w) => w.active && matcher(w.events, eventType)); +} + +async function update(id, patch) { + const existing = await cache.get(key(id)); + if (!existing) return null; + const next = { + ...existing, + ...patch, + id: existing.id, + created_at: existing.created_at, + updated_at: new Date().toISOString(), + }; + await cache.set(key(id), next); + return normalize(next); +} + +async function remove(id) { + const redis = cache.getClient(); + const existing = await cache.get(key(id)); + if (!existing) return null; + await cache.del(key(id)); + await redis.srem(IDS_KEY, id); + return normalize(existing); +} + +module.exports = { + create, + findById, + list, + listActiveForEvent, + update, + remove, +}; diff --git a/src/routes/webhooks.js b/src/routes/webhooks.js index 085407d..636cd27 100644 --- a/src/routes/webhooks.js +++ b/src/routes/webhooks.js @@ -1,3 +1,37 @@ + +'use strict'; + +const express = require('express'); +const logger = require('../logger'); +const config = require('../config'); +const webhookRepo = require('../repositories/webhookRepository'); +const deliveryRepo = require('../repositories/deliveryRepository'); +const dispatcher = require('../services/webhookDispatcher'); +const signatureService = require('../services/webhookSignature'); +const events = require('../services/webhookEvents'); +const buildRateLimit = require('../middleware/rateLimit'); + +const router = express.Router(); + +const manageLimit = buildRateLimit({ + windowSeconds: config.webhooks.rateLimit.windowSeconds, + max: config.webhooks.rateLimit.max, + keyPrefix: 'webhooks', +}); + +const testLimit = buildRateLimit({ + windowSeconds: config.webhooks.testRateLimit.windowSeconds, + max: config.webhooks.testRateLimit.max, + keyPrefix: 'webhooks_test', +}); + +router.use('/webhooks', manageLimit); + +function isValidUrl(str) { + try { + const u = new URL(str); + return u.protocol === 'http:' || u.protocol === 'https:'; + const express = require('express'); const webhooks = require('../services/webhooks'); const logger = require('../logger'); @@ -8,11 +42,31 @@ function isValidUrl(value) { try { const url = new URL(value); return url.protocol === 'http:' || url.protocol === 'https:'; + } catch { return false; } } + +function validateCreate(body) { + if (!body || typeof body !== 'object') return 'body must be an object'; + const { url, events: subscribedEvents, secret, description } = body; + + if (!url || !isValidUrl(url)) { + return 'url must be a valid http(s) URL'; + } + if (!events.isValidSubscription(subscribedEvents)) { + return `events must be a non-empty array of: ${events.ALL_EVENTS.join(', ')} or "*"`; + } + if (secret !== undefined) { + if (typeof secret !== 'string' || secret.length < 16) { + return 'secret must be a string of at least 16 characters'; + } + } + if (description !== undefined && typeof description !== 'string') { + return 'description must be a string'; + function validateEndpoint(body) { if (!body || !isValidUrl(body.url)) { return 'url must be a valid HTTP or HTTPS URL'; @@ -25,17 +79,55 @@ function validateEndpoint(body) { } if (!body.secret || typeof body.secret !== 'string' || body.secret.length < 8) { return 'secret must be at least 8 characters'; + } return null; } + +function publicView(webhook) { + if (!webhook) return null; + return { + id: webhook.id, + url: webhook.url, + events: webhook.events, + active: webhook.active, + description: webhook.description, + created_at: webhook.created_at, + updated_at: webhook.updated_at, + secret_preview: webhook.secret ? `${webhook.secret.slice(0, 10)}…` : null, + }; +} + +router.post('/webhooks', async (req, res) => { + try { + const validationError = validateCreate(req.body); + router.post('/webhooks', async (req, res) => { try { const validationError = validateEndpoint(req.body); + if (validationError) { return res.status(400).json({ error: 'Validation error', message: validationError }); } + + const secret = req.body.secret || signatureService.generateSecret(); + const webhook = await webhookRepo.create({ + url: req.body.url, + events: req.body.events, + secret, + description: req.body.description, + }); + + return res.status(201).json({ + ...publicView(webhook), + secret, + secret_warning: 'Store this secret now — it will not be shown again in plaintext.', + }); + } catch (err) { + logger.error('Create webhook error', { error: err.message }); + const endpoint = await webhooks.createEndpoint({ url: req.body.url, events: [...new Set(req.body.events)], @@ -45,22 +137,85 @@ router.post('/webhooks', async (req, res) => { return res.status(201).json(endpoint); } catch (err) { logger.error('Create webhook endpoint error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); } }); router.get('/webhooks', async (_req, res) => { try { + + const webhooks = await webhookRepo.list(); + return res.json({ webhooks: webhooks.map(publicView) }); + } catch (err) { + logger.error('List webhooks error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.get('/webhooks/:id', async (req, res) => { + try { + const webhook = await webhookRepo.findById(req.params.id); + if (!webhook) return res.status(404).json({ error: 'Webhook not found' }); + return res.json(publicView(webhook)); + } catch (err) { + logger.error('Get webhook error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.patch('/webhooks/:id', async (req, res) => { + try { + const patch = {}; + if (req.body.url !== undefined) { + if (!isValidUrl(req.body.url)) { + return res.status(400).json({ error: 'Validation error', message: 'url must be a valid http(s) URL' }); + } + patch.url = req.body.url; + } + if (req.body.events !== undefined) { + if (!events.isValidSubscription(req.body.events)) { + return res.status(400).json({ error: 'Validation error', message: 'events invalid' }); + } + patch.events = req.body.events; + } + if (req.body.active !== undefined) { + if (typeof req.body.active !== 'boolean') { + return res.status(400).json({ error: 'Validation error', message: 'active must be boolean' }); + } + patch.active = req.body.active; + } + if (req.body.description !== undefined) { + if (typeof req.body.description !== 'string') { + return res.status(400).json({ error: 'Validation error', message: 'description must be a string' }); + } + patch.description = req.body.description; + } + + const updated = await webhookRepo.update(req.params.id, patch); + if (!updated) return res.status(404).json({ error: 'Webhook not found' }); + return res.json(publicView(updated)); + } catch (err) { + logger.error('Update webhook error', { error: err.message }); + const endpoints = await webhooks.listEndpoints(); return res.json({ webhooks: endpoints }); } catch (err) { logger.error('List webhook endpoints error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); } }); router.delete('/webhooks/:id', async (req, res) => { try { + + const deleted = await webhookRepo.remove(req.params.id); + if (!deleted) return res.status(404).json({ error: 'Webhook not found' }); + return res.json({ deleted: true, id: req.params.id }); + } catch (err) { + logger.error('Delete webhook error', { error: err.message }); + const deleted = await webhooks.removeEndpoint(req.params.id); if (!deleted) { return res.status(404).json({ error: 'Webhook endpoint not found' }); @@ -68,10 +223,26 @@ router.delete('/webhooks/:id', async (req, res) => { return res.json({ deleted: true, webhook: deleted }); } catch (err) { logger.error('Delete webhook endpoint error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); } }); + +router.post('/webhooks/:id/test', testLimit, async (req, res) => { + try { + const delivery = await dispatcher.sendTest(req.params.id); + if (!delivery) return res.status(404).json({ error: 'Webhook not found' }); + return res.status(202).json({ + delivery_id: delivery.id, + status: delivery.status, + attempts: delivery.attempts, + response_status: delivery.response_status, + last_error: delivery.last_error, + }); + } catch (err) { + logger.error('Test webhook error', { error: err.message }); + router.post('/webhooks/:id/test', async (req, res) => { try { const delivery = await webhooks.sendTestPing(req.params.id); @@ -81,12 +252,22 @@ router.post('/webhooks/:id/test', async (req, res) => { return res.status(202).json({ delivery }); } catch (err) { logger.error('Test webhook delivery error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); } }); router.get('/webhooks/:id/deliveries', async (req, res) => { try { + + const webhook = await webhookRepo.findById(req.params.id); + if (!webhook) return res.status(404).json({ error: 'Webhook not found' }); + const limit = Math.min(parseInt(req.query.limit, 10) || 50, 100); + const deliveries = await deliveryRepo.listByWebhook(req.params.id, limit); + return res.json({ deliveries }); + } catch (err) { + logger.error('List deliveries error', { error: err.message }); + const endpoint = await webhooks.getEndpoint(req.params.id); if (!endpoint || !endpoint.active) { return res.status(404).json({ error: 'Webhook endpoint not found' }); @@ -97,6 +278,7 @@ router.get('/webhooks/:id/deliveries', async (req, res) => { return res.json({ deliveries }); } catch (err) { logger.error('List webhook deliveries error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); } }); diff --git a/src/services/webhookDispatcher.js b/src/services/webhookDispatcher.js new file mode 100644 index 0000000..45ce9ca --- /dev/null +++ b/src/services/webhookDispatcher.js @@ -0,0 +1,193 @@ +'use strict'; + +const axios = require('axios'); +const config = require('../config'); +const logger = require('../logger'); +const signature = require('./webhookSignature'); +const events = require('./webhookEvents'); +const webhookRepo = require('../repositories/webhookRepository'); +const deliveryRepo = require('../repositories/deliveryRepository'); + +const USER_AGENT = 'SmartDrop-Webhooks/1.0'; + +function backoffMs(attemptsCompleted) { + const base = config.webhooks.retryBaseMs; + const factor = config.webhooks.retryFactor; + return base * factor ** (attemptsCompleted - 1); +} + +function shouldRetry(responseStatus, networkError) { + if (networkError) return true; + if (responseStatus == null) return true; + if (responseStatus >= 500 && responseStatus < 600) return true; + if (responseStatus === 408 || responseStatus === 429) return true; + return false; +} + +function buildHeaders(secret, body, eventType, deliveryId) { + return { + 'Content-Type': 'application/json', + 'User-Agent': USER_AGENT, + 'X-SmartDrop-Event': eventType, + 'X-SmartDrop-Delivery': deliveryId, + 'X-SmartDrop-Signature': signature.sign(secret, body), + }; +} + +async function postOnce(url, headers, body) { + return axios.post(url, body, { + headers, + timeout: config.webhooks.timeoutMs, + transformRequest: [(data) => data], + validateStatus: () => true, + }); +} + +async function attempt(deliveryId) { + const delivery = await deliveryRepo.findById(deliveryId); + if (!delivery) { + logger.warn('Delivery missing, dropping retry', { delivery_id: deliveryId }); + return null; + } + if (delivery.status === 'success') return delivery; + + const webhook = await webhookRepo.findById(delivery.webhook_id); + if (!webhook || !webhook.active) { + return deliveryRepo.update(deliveryId, { + status: 'failed', + last_error: 'webhook missing or inactive', + last_attempt_at: new Date().toISOString(), + next_retry_at: null, + }); + } + + const payload = delivery.payload || { + event: delivery.event_type, + event_id: delivery.event_id, + delivery_id: delivery.id, + occurred_at: delivery.created_at, + }; + const body = JSON.stringify(payload); + const headers = buildHeaders(webhook.secret, body, delivery.event_type, delivery.id); + + const attempts = delivery.attempts + 1; + let responseStatus = null; + let networkError = null; + + try { + const res = await postOnce(webhook.url, headers, body); + responseStatus = res.status; + } catch (err) { + networkError = err.message || 'network error'; + } + + const succeeded = responseStatus != null && responseStatus >= 200 && responseStatus < 300; + const nowIso = new Date().toISOString(); + + if (succeeded) { + logger.info('Webhook delivered', { + delivery_id: delivery.id, + webhook_id: webhook.id, + attempts, + status: responseStatus, + }); + return deliveryRepo.update(deliveryId, { + status: 'success', + attempts, + last_attempt_at: nowIso, + next_retry_at: null, + last_error: null, + response_status: responseStatus, + }); + } + + const errorMessage = networkError || `HTTP ${responseStatus}`; + const retryable = shouldRetry(responseStatus, Boolean(networkError)); + const hasAttemptsLeft = attempts < config.webhooks.maxAttempts; + + if (retryable && hasAttemptsLeft) { + const delayMs = backoffMs(attempts); + const nextRetryAt = new Date(Date.now() + delayMs).toISOString(); + await deliveryRepo.scheduleRetry(delivery.id, Date.now() + delayMs); + logger.warn('Webhook delivery failed, retry scheduled', { + delivery_id: delivery.id, + webhook_id: webhook.id, + attempts, + error: errorMessage, + next_retry_at: nextRetryAt, + }); + return deliveryRepo.update(deliveryId, { + status: 'pending', + attempts, + last_attempt_at: nowIso, + next_retry_at: nextRetryAt, + last_error: errorMessage, + response_status: responseStatus, + }); + } + + logger.error('Webhook delivery failed permanently', { + delivery_id: delivery.id, + webhook_id: webhook.id, + attempts, + error: errorMessage, + }); + return deliveryRepo.update(deliveryId, { + status: 'failed', + attempts, + last_attempt_at: nowIso, + next_retry_at: null, + last_error: errorMessage, + response_status: responseStatus, + }); +} + +async function deliverToWebhook(webhook, eventType, eventId, payload) { + const delivery = await deliveryRepo.create({ + webhook_id: webhook.id, + event_id: eventId, + event_type: eventType, + }); + await deliveryRepo.update(delivery.id, { payload }); + return attempt(delivery.id); +} + +async function dispatch({ event_type: eventType, event_id: eventId, data }) { + if (!events.isKnownEvent(eventType)) { + logger.warn('Dispatch skipped, unknown event type', { event_type: eventType }); + return []; + } + if (!eventId || typeof eventId !== 'string') { + throw new Error('event_id is required to dispatch a webhook event'); + } + + const targets = await webhookRepo.listActiveForEvent(eventType, events.matchesSubscription); + if (targets.length === 0) return []; + + const occurredAt = new Date().toISOString(); + const payload = { + event: eventType, + event_id: eventId, + occurred_at: occurredAt, + data: data || {}, + }; + + return Promise.all( + targets.map((webhook) => deliverToWebhook(webhook, eventType, eventId, payload)) + ); +} + +async function sendTest(webhookId) { + const webhook = await webhookRepo.findById(webhookId); + if (!webhook) return null; + const eventType = 'pool.assets_locked'; + const payload = { + event: eventType, + event_id: `evt_test_${Date.now()}`, + occurred_at: new Date().toISOString(), + data: { test: true, message: 'This is a test delivery from SmartDrop' }, + }; + return deliverToWebhook(webhook, eventType, payload.event_id, payload); +} + +module.exports = { dispatch, attempt, sendTest, backoffMs, shouldRetry }; diff --git a/src/services/webhookEvents.js b/src/services/webhookEvents.js new file mode 100644 index 0000000..7b05280 --- /dev/null +++ b/src/services/webhookEvents.js @@ -0,0 +1,41 @@ +'use strict'; + +const POOL_EVENTS = Object.freeze([ + 'pool.created', + 'pool.assets_locked', + 'pool.assets_unlocked', + 'pool.rewards_distributed', + 'pool.closed', +]); + +const PRICE_EVENTS = Object.freeze(['price.alert']); + +const ALL_EVENTS = Object.freeze([...POOL_EVENTS, ...PRICE_EVENTS]); +const EVENT_SET = new Set(ALL_EVENTS); + +const WILDCARD = '*'; + +function isKnownEvent(eventType) { + return typeof eventType === 'string' && EVENT_SET.has(eventType); +} + +function isValidSubscription(events) { + if (!Array.isArray(events) || events.length === 0) return false; + return events.every((e) => e === WILDCARD || EVENT_SET.has(e)); +} + +function matchesSubscription(subscribedEvents, eventType) { + if (!Array.isArray(subscribedEvents) || subscribedEvents.length === 0) return false; + if (subscribedEvents.includes(WILDCARD)) return true; + return subscribedEvents.includes(eventType); +} + +module.exports = { + POOL_EVENTS, + PRICE_EVENTS, + ALL_EVENTS, + WILDCARD, + isKnownEvent, + isValidSubscription, + matchesSubscription, +}; diff --git a/src/services/webhookSignature.js b/src/services/webhookSignature.js new file mode 100644 index 0000000..867ab8c --- /dev/null +++ b/src/services/webhookSignature.js @@ -0,0 +1,36 @@ +'use strict'; + +const crypto = require('crypto'); + +const SIGNATURE_PREFIX = 'sha256='; + +function sign(secret, body) { + if (typeof secret !== 'string' || secret.length === 0) { + throw new Error('signature secret must be a non-empty string'); + } + const payload = typeof body === 'string' ? body : JSON.stringify(body); + const digest = crypto.createHmac('sha256', secret).update(payload).digest('hex'); + return `${SIGNATURE_PREFIX}${digest}`; +} + +function verify(secret, body, providedSignature) { + if (typeof providedSignature !== 'string' || !providedSignature.startsWith(SIGNATURE_PREFIX)) { + return false; + } + let expected; + try { + expected = sign(secret, body); + } catch { + return false; + } + const a = Buffer.from(expected); + const b = Buffer.from(providedSignature); + if (a.length !== b.length) return false; + return crypto.timingSafeEqual(a, b); +} + +function generateSecret(bytes = 32) { + return `whsec_${crypto.randomBytes(bytes).toString('hex')}`; +} + +module.exports = { sign, verify, generateSecret, SIGNATURE_PREFIX }; diff --git a/test/helpers/cacheMock.js b/test/helpers/cacheMock.js new file mode 100644 index 0000000..e7f3f0c --- /dev/null +++ b/test/helpers/cacheMock.js @@ -0,0 +1,99 @@ +'use strict'; + +/** + * In-memory mock of the ioredis surface used by src/services/cache.js. + * Covers strings (used by cache.get/set/del), SETs, and sorted SETs. + */ +function createCacheMock() { + const store = new Map(); + const sets = new Map(); + const zsets = new Map(); + const counters = new Map(); + + function getSet(key) { + if (!sets.has(key)) sets.set(key, new Set()); + return sets.get(key); + } + function getZSet(key) { + if (!zsets.has(key)) zsets.set(key, new Map()); + return zsets.get(key); + } + + const redis = { + smembers: jest.fn(async (key) => [...(sets.get(key) || [])]), + sadd: jest.fn(async (key, val) => { getSet(key).add(val); }), + srem: jest.fn(async (key, val) => { sets.get(key)?.delete(val); }), + zadd: jest.fn(async (key, score, member) => { getZSet(key).set(member, Number(score)); }), + zrem: jest.fn(async (key, ...members) => { + const z = zsets.get(key); + if (!z) return; + for (const m of members) z.delete(m); + }), + zrevrange: jest.fn(async (key, start, stop) => { + const z = zsets.get(key); + if (!z) return []; + const sorted = [...z.entries()].sort((a, b) => b[1] - a[1]).map(([m]) => m); + return sorted.slice(start, stop + 1); + }), + zrangebyscore: jest.fn(async (key, min, max, ...rest) => { + const z = zsets.get(key); + if (!z) return []; + const minScore = min === '-inf' ? -Infinity : Number(min); + const maxScore = max === '+inf' ? Infinity : Number(max); + let sorted = [...z.entries()] + .filter(([, score]) => score >= minScore && score <= maxScore) + .sort((a, b) => a[1] - b[1]) + .map(([m]) => m); + const limitIdx = rest.indexOf('LIMIT'); + if (limitIdx !== -1) { + const offset = Number(rest[limitIdx + 1]); + const count = Number(rest[limitIdx + 2]); + sorted = sorted.slice(offset, offset + count); + } + return sorted; + }), + zremrangebyrank: jest.fn(async (key, start, stop) => { + const z = zsets.get(key); + if (!z) return; + const sortedAsc = [...z.entries()].sort((a, b) => a[1] - b[1]).map(([m]) => m); + const end = stop < 0 ? sortedAsc.length + stop : stop; + const begin = start < 0 ? sortedAsc.length + start : start; + for (let i = begin; i <= end && i < sortedAsc.length; i += 1) { + z.delete(sortedAsc[i]); + } + }), + incr: jest.fn(async (key) => { + const n = (counters.get(key) || 0) + 1; + counters.set(key, n); + return n; + }), + expire: jest.fn(async () => 1), + }; + + const cacheMock = { + getClient: () => redis, + isConnected: () => true, + get: jest.fn(async (key) => { + const v = store.get(key); + return v !== undefined ? JSON.parse(JSON.stringify(v)) : null; + }), + set: jest.fn(async (key, value) => { store.set(key, JSON.parse(JSON.stringify(value))); }), + del: jest.fn(async (key) => { store.delete(key); }), + disconnect: jest.fn(async () => {}), + }; + + function reset() { + store.clear(); + sets.clear(); + zsets.clear(); + counters.clear(); + Object.values(redis).forEach((fn) => fn.mockClear?.()); + cacheMock.get.mockClear(); + cacheMock.set.mockClear(); + cacheMock.del.mockClear(); + } + + return { cacheMock, redis, store, sets, zsets, counters, reset }; +} + +module.exports = { createCacheMock }; diff --git a/test/rateLimit.test.js b/test/rateLimit.test.js new file mode 100644 index 0000000..629409e --- /dev/null +++ b/test/rateLimit.test.js @@ -0,0 +1,49 @@ +'use strict'; + +const express = require('express'); +const request = require('supertest'); +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const buildRateLimit = require('../src/middleware/rateLimit'); + +function buildApp(limiter) { + const app = express(); + app.use(limiter); + app.get('/test', (_req, res) => res.json({ ok: true })); + return app; +} + +beforeEach(() => reset()); + +describe('rateLimit middleware', () => { + test('allows requests under the limit and sets rate-limit headers', async () => { + const app = buildApp(buildRateLimit({ windowSeconds: 60, max: 3, keyPrefix: 't' })); + const r1 = await request(app).get('/test'); + expect(r1.status).toBe(200); + expect(r1.headers['x-ratelimit-limit']).toBe('3'); + expect(r1.headers['x-ratelimit-remaining']).toBe('2'); + }); + + test('returns 429 once the limit is exceeded', async () => { + const app = buildApp(buildRateLimit({ windowSeconds: 60, max: 2, keyPrefix: 'lim' })); + await request(app).get('/test'); + await request(app).get('/test'); + const blocked = await request(app).get('/test'); + expect(blocked.status).toBe(429); + expect(blocked.body.error).toBe('Too many requests'); + }); + + test('throws when configured with invalid options', () => { + expect(() => buildRateLimit({ windowSeconds: 0, max: 10, keyPrefix: 'x' })).toThrow(); + expect(() => buildRateLimit({ windowSeconds: 60, max: 0, keyPrefix: 'x' })).toThrow(); + expect(() => buildRateLimit({ windowSeconds: 60, max: 10 })).toThrow(); + }); +}); diff --git a/test/webhookDispatcher.test.js b/test/webhookDispatcher.test.js new file mode 100644 index 0000000..34e144a --- /dev/null +++ b/test/webhookDispatcher.test.js @@ -0,0 +1,241 @@ +'use strict'; + +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset, zsets } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const mockAxiosPost = jest.fn(); +jest.mock('axios', () => ({ post: (...args) => mockAxiosPost(...args) })); + +const dispatcher = require('../src/services/webhookDispatcher'); +const webhookRepo = require('../src/repositories/webhookRepository'); +const deliveryRepo = require('../src/repositories/deliveryRepository'); +const signature = require('../src/services/webhookSignature'); + +beforeEach(() => { + reset(); + mockAxiosPost.mockReset(); +}); + +async function createWebhook(overrides = {}) { + return webhookRepo.create({ + url: 'https://example.com/hook', + events: ['pool.assets_locked'], + secret: 'whsec_aaaaaaaaaaaaaaaa', + ...overrides, + }); +} + +describe('dispatcher delivery success', () => { + test('successful 200 marks delivery as success with attempts=1', async () => { + const w = await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_1', + data: { pool_id: 'p1' }, + }); + + expect(delivery.status).toBe('success'); + expect(delivery.attempts).toBe(1); + expect(delivery.response_status).toBe(200); + expect(mockAxiosPost).toHaveBeenCalledTimes(1); + + const [url, body, opts] = mockAxiosPost.mock.calls[0]; + expect(url).toBe(w.url); + expect(typeof body).toBe('string'); + const parsed = JSON.parse(body); + expect(parsed.event).toBe('pool.assets_locked'); + expect(parsed.event_id).toBe('evt_1'); + expect(parsed.data).toEqual({ pool_id: 'p1' }); + expect(opts.headers['X-SmartDrop-Signature']).toBe(signature.sign(w.secret, body)); + expect(opts.headers['X-SmartDrop-Event']).toBe('pool.assets_locked'); + }); +}); + +describe('dispatcher event-type filtering', () => { + test('only webhooks subscribed to the event receive a delivery', async () => { + await createWebhook({ url: 'https://a.com', events: ['pool.assets_locked'] }); + await createWebhook({ url: 'https://b.com', events: ['pool.closed'] }); + await createWebhook({ url: 'https://c.com', events: ['*'] }); + mockAxiosPost.mockResolvedValue({ status: 200 }); + + const results = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_42', + }); + + expect(results).toHaveLength(2); + const urls = mockAxiosPost.mock.calls.map((c) => c[0]).sort(); + expect(urls).toEqual(['https://a.com', 'https://c.com']); + }); + + test('inactive webhooks are skipped', async () => { + const w = await createWebhook(); + await webhookRepo.update(w.id, { active: false }); + mockAxiosPost.mockResolvedValue({ status: 200 }); + + const results = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_skip', + }); + expect(results).toHaveLength(0); + expect(mockAxiosPost).not.toHaveBeenCalled(); + }); + + test('unknown event types do not dispatch', async () => { + await createWebhook(); + const results = await dispatcher.dispatch({ + event_type: 'foo.bar', + event_id: 'evt_x', + }); + expect(results).toEqual([]); + expect(mockAxiosPost).not.toHaveBeenCalled(); + }); +}); + +describe('dispatcher retry semantics', () => { + test('5xx schedules a retry and keeps status pending', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 503 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_retry', + }); + + expect(delivery.status).toBe('pending'); + expect(delivery.attempts).toBe(1); + expect(delivery.next_retry_at).not.toBeNull(); + expect(delivery.last_error).toBe('HTTP 503'); + const queued = zsets.get('webhooks:retries'); + expect(queued.size).toBe(1); + expect([...queued.keys()][0]).toBe(delivery.id); + }); + + test('4xx (non-429) does NOT retry and marks failed', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 400 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_4xx', + }); + + expect(delivery.status).toBe('failed'); + expect(delivery.attempts).toBe(1); + expect(delivery.next_retry_at).toBeNull(); + const queued = zsets.get('webhooks:retries') || new Map(); + expect(queued.size).toBe(0); + }); + + test('network errors trigger a retry', async () => { + await createWebhook(); + mockAxiosPost.mockRejectedValueOnce(new Error('ECONNREFUSED')); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_net', + }); + + expect(delivery.status).toBe('pending'); + expect(delivery.last_error).toBe('ECONNREFUSED'); + expect(delivery.next_retry_at).not.toBeNull(); + }); + + test('after maxAttempts failures the delivery is permanently failed', async () => { + await createWebhook(); + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_max', + }); + + mockAxiosPost.mockResolvedValue({ status: 500 }); + const second = await dispatcher.attempt(delivery.id); + const third = await dispatcher.attempt(delivery.id); + + expect(third.status).toBe('failed'); + expect(third.attempts).toBe(3); + expect(third.next_retry_at).toBeNull(); + expect(second.status).toBe('pending'); + }); + + test('429 is treated as retryable', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 429 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_429', + }); + expect(delivery.status).toBe('pending'); + expect(delivery.next_retry_at).not.toBeNull(); + }); +}); + +describe('exponential backoff', () => { + test('delay grows by retryFactor each attempt', () => { + const d1 = dispatcher.backoffMs(1); + const d2 = dispatcher.backoffMs(2); + const d3 = dispatcher.backoffMs(3); + expect(d2).toBeGreaterThan(d1); + expect(d3).toBeGreaterThan(d2); + }); +}); + +describe('shouldRetry decision table', () => { + test('retries on network error', () => expect(dispatcher.shouldRetry(null, true)).toBe(true)); + test('retries on 500', () => expect(dispatcher.shouldRetry(500, false)).toBe(true)); + test('retries on 503', () => expect(dispatcher.shouldRetry(503, false)).toBe(true)); + test('retries on 408', () => expect(dispatcher.shouldRetry(408, false)).toBe(true)); + test('retries on 429', () => expect(dispatcher.shouldRetry(429, false)).toBe(true)); + test('does not retry on 400', () => expect(dispatcher.shouldRetry(400, false)).toBe(false)); + test('does not retry on 404', () => expect(dispatcher.shouldRetry(404, false)).toBe(false)); + test('does not retry on 200', () => expect(dispatcher.shouldRetry(200, false)).toBe(false)); +}); + +describe('sendTest', () => { + test('sends a test event to a specific webhook', async () => { + const w = await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + const delivery = await dispatcher.sendTest(w.id); + expect(delivery.status).toBe('success'); + expect(mockAxiosPost).toHaveBeenCalledTimes(1); + const body = JSON.parse(mockAxiosPost.mock.calls[0][1]); + expect(body.data.test).toBe(true); + }); + + test('returns null for unknown webhook', async () => { + const result = await dispatcher.sendTest('wh_unknown'); + expect(result).toBeNull(); + }); +}); + +describe('delivery payload persistence', () => { + test('payload is persisted so retries do not lose event data', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 500 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_payload', + data: { important: 'value' }, + }); + + const persisted = await deliveryRepo.findById(delivery.id); + expect(persisted.payload.data.important).toBe('value'); + + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + const retried = await dispatcher.attempt(delivery.id); + expect(retried.status).toBe('success'); + const body = JSON.parse(mockAxiosPost.mock.calls[1][1]); + expect(body.data.important).toBe('value'); + }); +}); diff --git a/test/webhookEvents.test.js b/test/webhookEvents.test.js new file mode 100644 index 0000000..c174077 --- /dev/null +++ b/test/webhookEvents.test.js @@ -0,0 +1,38 @@ +'use strict'; + +const events = require('../src/services/webhookEvents'); + +describe('webhook event registry', () => { + test('pool.assets_locked is a known event', () => { + expect(events.isKnownEvent('pool.assets_locked')).toBe(true); + }); + + test('unknown event types are rejected', () => { + expect(events.isKnownEvent('something.random')).toBe(false); + }); + + test('isValidSubscription accepts a non-empty array of known events', () => { + expect(events.isValidSubscription(['pool.assets_locked'])).toBe(true); + expect(events.isValidSubscription(['pool.assets_locked', 'pool.closed'])).toBe(true); + }); + + test('isValidSubscription accepts wildcard', () => { + expect(events.isValidSubscription(['*'])).toBe(true); + }); + + test('isValidSubscription rejects empty arrays and bad inputs', () => { + expect(events.isValidSubscription([])).toBe(false); + expect(events.isValidSubscription(null)).toBe(false); + expect(events.isValidSubscription(['nope'])).toBe(false); + }); + + test('matchesSubscription exact match', () => { + expect(events.matchesSubscription(['pool.assets_locked'], 'pool.assets_locked')).toBe(true); + expect(events.matchesSubscription(['pool.closed'], 'pool.assets_locked')).toBe(false); + }); + + test('matchesSubscription wildcard subscribes to all', () => { + expect(events.matchesSubscription(['*'], 'pool.assets_locked')).toBe(true); + expect(events.matchesSubscription(['*'], 'price.alert')).toBe(true); + }); +}); diff --git a/test/webhookRepository.test.js b/test/webhookRepository.test.js new file mode 100644 index 0000000..1897db6 --- /dev/null +++ b/test/webhookRepository.test.js @@ -0,0 +1,78 @@ +'use strict'; + +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const webhookRepo = require('../src/repositories/webhookRepository'); +const events = require('../src/services/webhookEvents'); + +beforeEach(() => reset()); + +describe('webhookRepository', () => { + test('create persists a webhook with generated id and active=true', async () => { + const w = await webhookRepo.create({ + url: 'https://example.com/hook', + events: ['pool.assets_locked'], + secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + expect(w.id).toMatch(/^wh_/); + expect(w.active).toBe(true); + expect(w.events).toEqual(['pool.assets_locked']); + }); + + test('findById returns the stored webhook', async () => { + const created = await webhookRepo.create({ + url: 'https://example.com/hook', + events: ['*'], + secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const found = await webhookRepo.findById(created.id); + expect(found.id).toBe(created.id); + }); + + test('findById returns null when missing', async () => { + expect(await webhookRepo.findById('wh_nope')).toBeNull(); + }); + + test('list returns all created webhooks', async () => { + await webhookRepo.create({ url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + await webhookRepo.create({ url: 'https://b.com', events: ['*'], secret: 'whsec_bbbbbbbbbbbbbbbb' }); + const all = await webhookRepo.list(); + expect(all).toHaveLength(2); + }); + + test('update merges patch and bumps updated_at', async () => { + const w = await webhookRepo.create({ url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + const updated = await webhookRepo.update(w.id, { active: false }); + expect(updated.active).toBe(false); + expect(updated.created_at).toBe(w.created_at); + expect(updated.updated_at >= w.updated_at).toBe(true); + }); + + test('remove deletes and returns the previous record', async () => { + const w = await webhookRepo.create({ url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + const removed = await webhookRepo.remove(w.id); + expect(removed.id).toBe(w.id); + expect(await webhookRepo.list()).toHaveLength(0); + }); + + test('listActiveForEvent filters by subscription and active flag', async () => { + const a = await webhookRepo.create({ url: 'https://a.com', events: ['pool.assets_locked'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + const b = await webhookRepo.create({ url: 'https://b.com', events: ['pool.closed'], secret: 'whsec_bbbbbbbbbbbbbbbb' }); + const c = await webhookRepo.create({ url: 'https://c.com', events: ['*'], secret: 'whsec_cccccccccccccccc' }); + await webhookRepo.update(c.id, { active: false }); + + const result = await webhookRepo.listActiveForEvent('pool.assets_locked', events.matchesSubscription); + const ids = result.map((w) => w.id).sort(); + expect(ids).toEqual([a.id].sort()); + expect(ids).not.toContain(b.id); + expect(ids).not.toContain(c.id); + }); +}); diff --git a/test/webhookSignature.test.js b/test/webhookSignature.test.js index ba0646f..d39a0e3 100644 --- a/test/webhookSignature.test.js +++ b/test/webhookSignature.test.js @@ -1,5 +1,55 @@ 'use strict'; + +const signature = require('../src/services/webhookSignature'); + +describe('webhook signature', () => { + const secret = 'whsec_test_supersecret_value'; + const body = JSON.stringify({ event: 'pool.assets_locked', amount: 42 }); + + test('sign produces a sha256= prefixed hex string', () => { + const sig = signature.sign(secret, body); + expect(sig).toMatch(/^sha256=[0-9a-f]{64}$/); + }); + + test('verify returns true for matching body and signature', () => { + const sig = signature.sign(secret, body); + expect(signature.verify(secret, body, sig)).toBe(true); + }); + + test('verify returns false when body is tampered', () => { + const sig = signature.sign(secret, body); + const tampered = body.replace('42', '43'); + expect(signature.verify(secret, tampered, sig)).toBe(false); + }); + + test('verify returns false when signature is tampered', () => { + const sig = signature.sign(secret, body); + const tampered = sig.replace(/.$/, sig.endsWith('a') ? 'b' : 'a'); + expect(signature.verify(secret, body, tampered)).toBe(false); + }); + + test('verify returns false when signature lacks the prefix', () => { + const sig = signature.sign(secret, body).replace('sha256=', ''); + expect(signature.verify(secret, body, sig)).toBe(false); + }); + + test('verify returns false for empty/wrong secret', () => { + const sig = signature.sign(secret, body); + expect(signature.verify('other_secret_value', body, sig)).toBe(false); + }); + + test('generateSecret produces a whsec_-prefixed token', () => { + const s = signature.generateSecret(); + expect(s).toMatch(/^whsec_[0-9a-f]{64}$/); + }); + + test('sign accepts objects by stringifying them', () => { + const obj = { a: 1, b: 'two' }; + const sigFromObj = signature.sign(secret, obj); + const sigFromStr = signature.sign(secret, JSON.stringify(obj)); + expect(sigFromObj).toBe(sigFromStr); + const http = require('http'); const { buildSignatureHeaders, diff --git a/test/webhooks.routes.test.js b/test/webhooks.routes.test.js new file mode 100644 index 0000000..6552ddd --- /dev/null +++ b/test/webhooks.routes.test.js @@ -0,0 +1,177 @@ +'use strict'; + +const express = require('express'); +const request = require('supertest'); +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const mockAxiosPost = jest.fn(); +jest.mock('axios', () => ({ post: (...args) => mockAxiosPost(...args) })); + +const webhooksRouter = require('../src/routes/webhooks'); + +function buildApp() { + const app = express(); + app.use(express.json()); + app.use('/api/v1', webhooksRouter); + return app; +} + +beforeEach(() => { + reset(); + mockAxiosPost.mockReset(); +}); + +describe('POST /api/v1/webhooks', () => { + const app = buildApp(); + + test('creates a webhook with valid input', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ + url: 'https://example.com/hook', + events: ['pool.assets_locked'], + secret: 'whsec_user_supplied_secret_long_enough', + }); + expect(res.status).toBe(201); + expect(res.body.id).toMatch(/^wh_/); + expect(res.body.events).toEqual(['pool.assets_locked']); + expect(res.body.secret).toBe('whsec_user_supplied_secret_long_enough'); + expect(res.body.secret_warning).toMatch(/Store this secret/); + }); + + test('generates a secret when none provided', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'https://example.com/hook', events: ['*'] }); + expect(res.status).toBe(201); + expect(res.body.secret).toMatch(/^whsec_[0-9a-f]+$/); + }); + + test('rejects invalid url', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'not-a-url', events: ['*'] }); + expect(res.status).toBe(400); + }); + + test('rejects unknown event types', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'https://example.com/hook', events: ['totally.fake'] }); + expect(res.status).toBe(400); + }); + + test('rejects too-short secret', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'https://example.com/hook', events: ['*'], secret: 'short' }); + expect(res.status).toBe(400); + }); +}); + +describe('GET /api/v1/webhooks', () => { + const app = buildApp(); + + test('lists registered webhooks without leaking full secrets', async () => { + await request(app).post('/api/v1/webhooks').send({ + url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const res = await request(app).get('/api/v1/webhooks'); + expect(res.status).toBe(200); + expect(res.body.webhooks).toHaveLength(1); + expect(res.body.webhooks[0].secret_preview).toMatch(/^whsec_/); + expect(res.body.webhooks[0]).not.toHaveProperty('secret'); + }); +}); + +describe('GET /api/v1/webhooks/:id', () => { + const app = buildApp(); + + test('returns 404 for unknown webhook', async () => { + const res = await request(app).get('/api/v1/webhooks/wh_nope'); + expect(res.status).toBe(404); + }); +}); + +describe('DELETE /api/v1/webhooks/:id', () => { + const app = buildApp(); + + test('deletes a registered webhook', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const del = await request(app).delete(`/api/v1/webhooks/${created.body.id}`); + expect(del.status).toBe(200); + expect(del.body.deleted).toBe(true); + + const list = await request(app).get('/api/v1/webhooks'); + expect(list.body.webhooks).toHaveLength(0); + }); + + test('returns 404 when deleting unknown id', async () => { + const res = await request(app).delete('/api/v1/webhooks/wh_nope'); + expect(res.status).toBe(404); + }); +}); + +describe('PATCH /api/v1/webhooks/:id', () => { + const app = buildApp(); + + test('updates active status', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const res = await request(app) + .patch(`/api/v1/webhooks/${created.body.id}`) + .send({ active: false }); + expect(res.status).toBe(200); + expect(res.body.active).toBe(false); + }); +}); + +describe('POST /api/v1/webhooks/:id/test', () => { + const app = buildApp(); + + test('sends a test delivery and returns delivery summary', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + + const res = await request(app).post(`/api/v1/webhooks/${created.body.id}/test`); + expect(res.status).toBe(202); + expect(res.body.delivery_id).toMatch(/^dlv_/); + expect(res.body.status).toBe('success'); + expect(mockAxiosPost).toHaveBeenCalledTimes(1); + }); + + test('returns 404 when webhook does not exist', async () => { + const res = await request(app).post('/api/v1/webhooks/wh_nope/test'); + expect(res.status).toBe(404); + }); +}); + +describe('GET /api/v1/webhooks/:id/deliveries', () => { + const app = buildApp(); + + test('lists deliveries for a webhook', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + mockAxiosPost.mockResolvedValue({ status: 200 }); + await request(app).post(`/api/v1/webhooks/${created.body.id}/test`); + + const res = await request(app).get(`/api/v1/webhooks/${created.body.id}/deliveries`); + expect(res.status).toBe(200); + expect(Array.isArray(res.body.deliveries)).toBe(true); + expect(res.body.deliveries.length).toBeGreaterThan(0); + }); +});