From 7755dd79bda7f3e12e162340ce80fd051f9d4a96 Mon Sep 17 00:00:00 2001 From: michaelvic123 Date: Mon, 29 Jun 2026 14:49:58 +0100 Subject: [PATCH] security: verify payload integrity before processing notifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - add payload-integrity.ts — hashPayload() and verifyPayloadIntegrity() using HMAC-SHA256 with timing-safe comparison - store payload_hash at schedule time (requires PAYLOAD_INTEGRITY_SECRET env) - add payload_hash column via additive ALTER TABLE migration (idempotent) - verify hash in scheduler before execution — tampered payloads are immediately failed (retries exhausted) and logged as errors - 13 tests: unit (hash/verify) + integration (store, match, tamper, valid) --- listener/src/database/database.ts | 7 + .../src/services/notification-scheduler.ts | 25 +++ .../scheduled-notification-repository.ts | 13 +- listener/src/tests/payload-integrity.test.ts | 154 ++++++++++++++++++ listener/src/types/scheduled-notification.ts | 2 + listener/src/utils/payload-integrity.ts | 27 +++ 6 files changed, 225 insertions(+), 3 deletions(-) create mode 100644 listener/src/tests/payload-integrity.test.ts create mode 100644 listener/src/utils/payload-integrity.ts diff --git a/listener/src/database/database.ts b/listener/src/database/database.ts index 6ae3b94..4e5474e 100644 --- a/listener/src/database/database.ts +++ b/listener/src/database/database.ts @@ -84,6 +84,13 @@ export class Database { // Execute the schema as one script so trigger bodies with semicolons work. await this.exec(schema); + // Additive migrations — safe to run repeatedly (errors ignored if column exists) + await this.run( + `ALTER TABLE scheduled_notifications ADD COLUMN payload_hash TEXT` + ).catch(() => { + // Column already exists — nothing to do + }); + logger.info('Database migrations completed'); } diff --git a/listener/src/services/notification-scheduler.ts b/listener/src/services/notification-scheduler.ts index 4b36704..5140b27 100644 --- a/listener/src/services/notification-scheduler.ts +++ b/listener/src/services/notification-scheduler.ts @@ -4,6 +4,7 @@ import { generateRequestId } from '../utils/request-id'; import { ScheduledNotificationRepository } from './scheduled-notification-repository'; import { SchedulerConfig, NotificationStatus, ScheduledNotification } from '../types/scheduled-notification'; import { DiscordNotificationService } from './discord-notification'; +import { verifyPayloadIntegrity } from '../utils/payload-integrity'; /** * Background scheduler that processes scheduled notifications @@ -185,6 +186,30 @@ export class NotificationScheduler { return; } + // Verify payload integrity before executing + const secret = process.env.PAYLOAD_INTEGRITY_SECRET; + if (secret) { + if (!notification.payloadHash) { + logger.warn('Payload integrity check skipped — no hash stored', { + requestId, + id: notification.id, + }); + } else if (!verifyPayloadIntegrity(notification.payload, notification.payloadHash, secret)) { + logger.error('Payload integrity verification failed — rejecting notification', { + requestId, + id: notification.id, + type: notification.notificationType, + }); + await this.repository.markAsFailedOrRetry( + notification.id!, + new Error('Payload integrity check failed: hash mismatch'), + notification.maxRetries, // exhaust retries — don't retry a tampered payload + notification.maxRetries + ); + return; + } + } + // Execute notification based on type const success = await this.executeNotification(notification, requestId); diff --git a/listener/src/services/scheduled-notification-repository.ts b/listener/src/services/scheduled-notification-repository.ts index b326707..9d199cb 100644 --- a/listener/src/services/scheduled-notification-repository.ts +++ b/listener/src/services/scheduled-notification-repository.ts @@ -7,6 +7,7 @@ import { NotificationStatus, NotificationExecutionLog, } from '../types/scheduled-notification'; +import { hashPayload } from '../utils/payload-integrity'; /** * Repository for scheduled notifications database operations @@ -19,15 +20,20 @@ export class ScheduledNotificationRepository { * Create a new scheduled notification */ async create(input: CreateScheduledNotificationInput, requestId?: string): Promise { + const payloadJson = JSON.stringify(input.payload); + const secret = process.env.PAYLOAD_INTEGRITY_SECRET; + const payloadHash = secret ? hashPayload(payloadJson, secret) : null; + const sql = ` INSERT INTO scheduled_notifications ( - payload, notification_type, target_recipient, execute_at, + payload, payload_hash, notification_type, target_recipient, execute_at, max_retries, event_id, contract_address, priority, metadata - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `; const params = [ - JSON.stringify(input.payload), + payloadJson, + payloadHash, input.notificationType, input.targetRecipient, input.executeAt.toISOString(), @@ -326,6 +332,7 @@ export class ScheduledNotificationRepository { return { id: row.id, payload: row.payload, + payloadHash: row.payload_hash, notificationType: row.notification_type as any, targetRecipient: row.target_recipient, executeAt: new Date(row.execute_at), diff --git a/listener/src/tests/payload-integrity.test.ts b/listener/src/tests/payload-integrity.test.ts new file mode 100644 index 0000000..755b6b8 --- /dev/null +++ b/listener/src/tests/payload-integrity.test.ts @@ -0,0 +1,154 @@ +import { hashPayload, verifyPayloadIntegrity } from '../utils/payload-integrity'; +import { Database } from '../database/database'; +import { ScheduledNotificationRepository } from '../services/scheduled-notification-repository'; +import { NotificationAPI } from '../services/notification-api'; +import { NotificationType } from '../types/scheduled-notification'; +import * as fs from 'fs'; +import * as path from 'path'; + +const SECRET = 'test-secret-key'; +const TEST_DB = './data/test-integrity.db'; + +// ─── Unit tests ────────────────────────────────────────────────────────────── + +describe('hashPayload', () => { + it('produces a hex string', () => { + const hash = hashPayload('{"foo":"bar"}', SECRET); + expect(hash).toMatch(/^[a-f0-9]{64}$/); + }); + + it('is deterministic', () => { + const payload = '{"event":"test"}'; + expect(hashPayload(payload, SECRET)).toBe(hashPayload(payload, SECRET)); + }); + + it('differs when payload changes', () => { + expect(hashPayload('{"a":1}', SECRET)).not.toBe(hashPayload('{"a":2}', SECRET)); + }); + + it('differs when secret changes', () => { + const payload = '{"a":1}'; + expect(hashPayload(payload, 'secret-a')).not.toBe(hashPayload(payload, 'secret-b')); + }); +}); + +describe('verifyPayloadIntegrity', () => { + it('returns true for a valid payload/hash pair', () => { + const payload = '{"message":"hello"}'; + const hash = hashPayload(payload, SECRET); + expect(verifyPayloadIntegrity(payload, hash, SECRET)).toBe(true); + }); + + it('returns false when payload is tampered', () => { + const original = '{"amount":100}'; + const hash = hashPayload(original, SECRET); + expect(verifyPayloadIntegrity('{"amount":999}', hash, SECRET)).toBe(false); + }); + + it('returns false when hash is wrong', () => { + const payload = '{"ok":true}'; + expect(verifyPayloadIntegrity(payload, 'deadbeef', SECRET)).toBe(false); + }); + + it('returns false for empty payload', () => { + expect(verifyPayloadIntegrity('', hashPayload('', SECRET), SECRET)).toBe(false); + }); + + it('returns false for empty hash', () => { + expect(verifyPayloadIntegrity('{"x":1}', '', SECRET)).toBe(false); + }); +}); + +// ─── Integration tests ─────────────────────────────────────────────────────── + +describe('payload integrity — repository and scheduler integration', () => { + let db: Database; + let repository: ScheduledNotificationRepository; + let api: NotificationAPI; + + beforeAll(async () => { + process.env.PAYLOAD_INTEGRITY_SECRET = SECRET; + + const dir = path.dirname(TEST_DB); + if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }); + if (fs.existsSync(TEST_DB)) fs.unlinkSync(TEST_DB); + + db = new Database(TEST_DB); + await db.initialize(); + repository = new ScheduledNotificationRepository(db); + api = new NotificationAPI(repository); + }); + + afterAll(async () => { + delete process.env.PAYLOAD_INTEGRITY_SECRET; + await db.close(); + if (fs.existsSync(TEST_DB)) fs.unlinkSync(TEST_DB); + }); + + beforeEach(async () => { + await db.run('DELETE FROM notification_execution_log'); + await db.run('DELETE FROM scheduled_notifications'); + }); + + it('stores a payload_hash when secret is set', async () => { + const id = await api.scheduleNotification({ + payload: { message: 'hello' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook/test', + executeAt: new Date(Date.now() + 60000), + }); + + const notification = await repository.getById(id); + expect(notification!.payloadHash).toBeTruthy(); + expect(notification!.payloadHash).toMatch(/^[a-f0-9]{64}$/); + }); + + it('stored hash matches the payload', async () => { + const payload = { message: 'integrity check' }; + const id = await api.scheduleNotification({ + payload, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook/test', + executeAt: new Date(Date.now() + 60000), + }); + + const notification = await repository.getById(id); + const expected = hashPayload(JSON.stringify(payload), SECRET); + expect(notification!.payloadHash).toBe(expected); + }); + + it('detects a tampered payload', async () => { + const id = await api.scheduleNotification({ + payload: { amount: 100 }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook/test', + executeAt: new Date(Date.now() + 60000), + }); + + // Tamper directly in the DB + await db.run('UPDATE scheduled_notifications SET payload = ? WHERE id = ?', [ + JSON.stringify({ amount: 999 }), + id, + ]); + + const notification = await repository.getById(id); + expect( + verifyPayloadIntegrity(notification!.payload, notification!.payloadHash!, SECRET) + ).toBe(false); + }); + + it('verifies an untampered payload successfully', async () => { + const payload = { event: 'TaskCreated', ledger: 42 }; + const id = await api.scheduleNotification({ + payload, + notificationType: NotificationType.DISCORD, + targetRecipient: 'https://discord.com/webhook/test', + executeAt: new Date(Date.now() + 60000), + }); + + const notification = await repository.getById(id); + expect( + verifyPayloadIntegrity(notification!.payload, notification!.payloadHash!, SECRET) + ).toBe(true); + }); +}); diff --git a/listener/src/types/scheduled-notification.ts b/listener/src/types/scheduled-notification.ts index f564afc..b5b839e 100644 --- a/listener/src/types/scheduled-notification.ts +++ b/listener/src/types/scheduled-notification.ts @@ -20,6 +20,7 @@ export enum NotificationType { export interface ScheduledNotification { id?: number; payload: string; // JSON string + payloadHash?: string | null; notificationType: NotificationType; targetRecipient: string; executeAt: Date; @@ -55,6 +56,7 @@ export interface CreateScheduledNotificationInput { export interface ScheduledNotificationRow { id: number; payload: string; + payload_hash: string | null; notification_type: string; target_recipient: string; execute_at: string; diff --git a/listener/src/utils/payload-integrity.ts b/listener/src/utils/payload-integrity.ts new file mode 100644 index 0000000..6d3ad60 --- /dev/null +++ b/listener/src/utils/payload-integrity.ts @@ -0,0 +1,27 @@ +import crypto from 'crypto'; + +/** + * Computes an HMAC-SHA256 hash of a payload string. + * The secret should come from the PAYLOAD_INTEGRITY_SECRET env var. + */ +export function hashPayload(payload: string, secret: string): string { + return crypto.createHmac('sha256', secret).update(payload, 'utf8').digest('hex'); +} + +/** + * Verifies that a payload matches its stored hash using a timing-safe comparison. + * Returns false if either argument is missing or the hash doesn't match. + */ +export function verifyPayloadIntegrity( + payload: string, + storedHash: string, + secret: string +): boolean { + if (!payload || !storedHash) return false; + + const expected = hashPayload(payload, secret); + + if (expected.length !== storedHash.length) return false; + + return crypto.timingSafeEqual(Buffer.from(expected, 'utf8'), Buffer.from(storedHash, 'utf8')); +}