From ac2ded0b63fbd260b24b1f04258f2f54e641a16f Mon Sep 17 00:00:00 2001 From: Osuochasam Date: Mon, 29 Jun 2026 14:14:00 +0100 Subject: [PATCH] test: reduce storage and transmission costs by compressing large notification payloads before processing/storing them, and decompressing them upon retrieval --- listener/src/database/schema.sql | 2 +- .../scheduled-notification-repository.ts | 7 +- .../src/utils/payload-compression.test.ts | 58 ++++++++++ listener/src/utils/payload-compression.ts | 102 ++++++++++++++++++ 4 files changed, 166 insertions(+), 3 deletions(-) create mode 100644 listener/src/utils/payload-compression.test.ts create mode 100644 listener/src/utils/payload-compression.ts diff --git a/listener/src/database/schema.sql b/listener/src/database/schema.sql index b303b4a..ef62fe9 100644 --- a/listener/src/database/schema.sql +++ b/listener/src/database/schema.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS scheduled_notifications ( id INTEGER PRIMARY KEY AUTOINCREMENT, -- Notification content and metadata - payload TEXT NOT NULL, -- JSON payload of the notification + payload TEXT NOT NULL, -- JSON payload of the notification (compressed when large) notification_type VARCHAR(50) NOT NULL, -- Type: 'discord', 'email', 'webhook', etc. target_recipient TEXT NOT NULL, -- User ID, webhook URL, or recipient identifier diff --git a/listener/src/services/scheduled-notification-repository.ts b/listener/src/services/scheduled-notification-repository.ts index b326707..f64a37e 100644 --- a/listener/src/services/scheduled-notification-repository.ts +++ b/listener/src/services/scheduled-notification-repository.ts @@ -1,5 +1,6 @@ import { Database } from '../database/database'; import logger from '../utils/logger'; +import { compressPayload, decompressPayload } from '../utils/payload-compression'; import { ScheduledNotification, ScheduledNotificationRow, @@ -26,8 +27,10 @@ export class ScheduledNotificationRepository { ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `; + const serializedPayload = compressPayload(input.payload); + const params = [ - JSON.stringify(input.payload), + serializedPayload, input.notificationType, input.targetRecipient, input.executeAt.toISOString(), @@ -325,7 +328,7 @@ export class ScheduledNotificationRepository { private rowToModel(row: ScheduledNotificationRow): ScheduledNotification { return { id: row.id, - payload: row.payload, + payload: decompressPayload(row.payload), notificationType: row.notification_type as any, targetRecipient: row.target_recipient, executeAt: new Date(row.execute_at), diff --git a/listener/src/utils/payload-compression.test.ts b/listener/src/utils/payload-compression.test.ts new file mode 100644 index 0000000..f3a18d5 --- /dev/null +++ b/listener/src/utils/payload-compression.test.ts @@ -0,0 +1,58 @@ +import { + compressPayload, + decompressPayload, + COMPRESSION_PREFIX, + DEFAULT_COMPRESSION_THRESHOLD_BYTES, +} from './payload-compression'; + +describe('payload compression utilities', () => { + it('compresses and restores a large payload without data loss', () => { + const payload = { + event: 'large-notification', + message: 'x'.repeat(15000), + details: Array.from({ length: 200 }, (_, index) => ({ + id: index, + label: `entry-${index}`, + text: 'compressed-payload'.repeat(20), + })), + }; + + const serialized = JSON.stringify(payload); + const compressed = compressPayload(payload); + const restored = decompressPayload(compressed); + + expect(compressed).toContain(COMPRESSION_PREFIX); + expect(Buffer.byteLength(compressed, 'utf8')).toBeLessThan(Buffer.byteLength(serialized, 'utf8')); + expect(restored).toEqual(serialized); + }); + + it('passes a small payload through without compression', () => { + const payload = { message: 'small payload' }; + const serialized = JSON.stringify(payload); + + const compressed = compressPayload(payload, { thresholdBytes: DEFAULT_COMPRESSION_THRESHOLD_BYTES + 10000 }); + + expect(compressed).toEqual(serialized); + expect(decompressPayload(compressed)).toEqual(serialized); + }); + + it('falls back safely for legacy uncompressed payloads', () => { + const legacyPayload = JSON.stringify({ message: 'legacy payload' }); + + expect(decompressPayload(legacyPayload)).toEqual(legacyPayload); + }); + + it('handles invalid or corrupted compressed input without throwing', () => { + const corruptedPayload = `${COMPRESSION_PREFIX}not-valid-base64`; + + expect(() => decompressPayload(corruptedPayload)).not.toThrow(); + expect(decompressPayload(corruptedPayload)).toEqual(corruptedPayload); + }); + + it('passes unsupported payload formats through safely', () => { + const unsupportedPayload = 42 as unknown; + + expect(compressPayload(unsupportedPayload)).toBe('42'); + expect(decompressPayload(unsupportedPayload)).toBe('42'); + }); +}); diff --git a/listener/src/utils/payload-compression.ts b/listener/src/utils/payload-compression.ts new file mode 100644 index 0000000..601eef5 --- /dev/null +++ b/listener/src/utils/payload-compression.ts @@ -0,0 +1,102 @@ +import { gzipSync, gunzipSync } from 'zlib'; +import logger from './logger'; + +export const COMPRESSION_PREFIX = '__COMPRESSED__:'; +export const DEFAULT_COMPRESSION_THRESHOLD_BYTES = 10 * 1024; + +interface CompressionOptions { + thresholdBytes?: number; +} + +interface CompressionMetrics { + originalSizeBytes: number; + compressedSizeBytes: number; + reductionPercent: number; + wasCompressed: boolean; +} + +function toSerializableString(payload: unknown): string { + if (typeof payload === 'string') { + return payload; + } + + if (payload === null || payload === undefined) { + return JSON.stringify(payload); + } + + if (typeof payload === 'object' || typeof payload === 'number' || typeof payload === 'boolean') { + return JSON.stringify(payload); + } + + return String(payload); +} + +function calculateMetrics(originalSizeBytes: number, compressedSizeBytes: number): CompressionMetrics { + const reductionPercent = originalSizeBytes > 0 + ? Math.max(0, Math.round(((originalSizeBytes - compressedSizeBytes) / originalSizeBytes) * 100)) + : 0; + + return { + originalSizeBytes, + compressedSizeBytes, + reductionPercent, + wasCompressed: compressedSizeBytes < originalSizeBytes, + }; +} + +export function compressPayload(payload: unknown, options: CompressionOptions = {}): string { + const threshold = options.thresholdBytes ?? DEFAULT_COMPRESSION_THRESHOLD_BYTES; + const serialized = toSerializableString(payload); + const originalSize = Buffer.byteLength(serialized, 'utf8'); + + if (originalSize < threshold) { + return serialized; + } + + try { + const compressedBuffer = gzipSync(serialized); + const compressedSize = compressedBuffer.length; + const metrics = calculateMetrics(originalSize, compressedSize); + + logger.info('Payload compressed', { + originalSizeBytes: metrics.originalSizeBytes, + compressedSizeBytes: metrics.compressedSizeBytes, + reductionPercent: metrics.reductionPercent, + }); + + return `${COMPRESSION_PREFIX}${compressedBuffer.toString('base64')}`; + } catch (error) { + logger.warn('Payload compression failed, using original payload', { error }); + return serialized; + } +} + +export function decompressPayload(compressedPayload: unknown): string { + if (typeof compressedPayload !== 'string') { + return toSerializableString(compressedPayload); + } + + if (!compressedPayload.startsWith(COMPRESSION_PREFIX)) { + return compressedPayload; + } + + const base64Payload = compressedPayload.slice(COMPRESSION_PREFIX.length); + + try { + const buffer = Buffer.from(base64Payload, 'base64'); + const decompressed = gunzipSync(buffer); + return decompressed.toString('utf8'); + } catch (error) { + logger.warn('Payload decompression failed, returning original value', { error, payloadPreview: compressedPayload.slice(0, 80) }); + return compressedPayload; + } +} + +export function logCompressionMetrics(metrics: CompressionMetrics): void { + logger.info('Payload compression metrics', { + originalSizeBytes: metrics.originalSizeBytes, + compressedSizeBytes: metrics.compressedSizeBytes, + reductionPercent: metrics.reductionPercent, + wasCompressed: metrics.wasCompressed, + }); +}