From 7e185bb890bc4dd7176fbdac1d798770ac40ecdf Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Tue, 12 May 2026 18:21:28 +0200 Subject: [PATCH 1/3] feat(webhooks): verifyAndParse* API for compressed payloads (CHA-3071) Adds first-class support for gzip-compressed webhook payloads (HTTP webhooks, SQS, SNS) and exposes the cross-SDK verifyAndParse* / parseSqs / parseSns contract that the other Stream SDKs ship in the CHA-3071 rollout. New module: src/utils/webhook.ts Primitives: - gunzipPayload(body) - gzip-magic-byte detection (RFC 1952 1f 8b), no-op when not compressed - decodeSqsPayload(body) - strict base64 decode then gunzip-if-magic - decodeSnsPayload(envelope) - JSON-parse the SNS HTTP notification, extract Message, run the SQS pipeline; falls through to a pre-extracted Message string for backward compatibility - verifySignature(...) - constant-time HMAC-SHA256 over the uncompressed body - parseEvent(payload) - JSON -> typed WHEvent Composites (return typed WHEvent): - verifyAndParseWebhook(rawBody, signature, secret) - parseSqs(messageBody) - parseSns(notificationBody) StreamClient#verifyAndParseWebhook / parseSqs / parseSns use the configured client secret automatically. Backward compatibility ---------------------- StreamClient#verifyWebhook (boolean) is unchanged. Unified error handling ---------------------- Every failure path terminates at a single class - InvalidWebhookError - with one of four canonical messages exported as InvalidWebhookErrorMessages: signatureMismatch / invalidBase64 / gzipFailed / invalidJson. Customers only need one catch arm; the message identifies which mode fired. Tests ----- __tests__/webhook-compression.test.ts covers plain / gzip / base64+gzip payloads, signature mismatches, malformed bytes, SNS envelope unwrapping with and without leading whitespace, and JSON parsing into a typed WHEvent. --- __tests__/webhook-compression.test.ts | 284 ++++++++++++++++++++++++++ index.ts | 12 ++ src/StreamClient.ts | 51 +++++ src/utils/webhook.ts | 206 +++++++++++++++++++ 4 files changed, 553 insertions(+) create mode 100644 __tests__/webhook-compression.test.ts create mode 100644 src/utils/webhook.ts diff --git a/__tests__/webhook-compression.test.ts b/__tests__/webhook-compression.test.ts new file mode 100644 index 0000000..6b88477 --- /dev/null +++ b/__tests__/webhook-compression.test.ts @@ -0,0 +1,284 @@ +import crypto from 'crypto'; +import zlib from 'zlib'; + +import { beforeEach, describe, expect, it } from 'vitest'; + +import { StreamClient } from '../src/StreamClient'; +import { + decodeSnsPayload, + decodeSqsPayload, + gunzipPayload, + InvalidWebhookError, + InvalidWebhookErrorMessages, + parseEvent, + parseSns, + parseSqs, + verifyAndParseWebhook, + verifySignature, +} from '../src/utils/webhook'; + +const JSON_BODY = + '{"type":"user.updated","created_at":"2026-04-08T14:23:35.879421674Z","user":{"id":"u-1"}}'; +const API_KEY = 'tkey'; +const API_SECRET = 'tsec2'; + +const sign = (body: Buffer | string) => + crypto + .createHmac('sha256', Buffer.from(API_SECRET, 'utf8')) + .update(body) + .digest('hex'); + +const gzip = (body: Buffer | string) => + zlib.gzipSync(Buffer.isBuffer(body) ? body : Buffer.from(body)); + +const base64 = (body: Buffer | string) => + (Buffer.isBuffer(body) ? body : Buffer.from(body)).toString('base64'); + +const snsEnvelope = (innerMessage: string) => + JSON.stringify({ + Type: 'Notification', + MessageId: '22b80b92-fdea-4c2c-8f9d-bdfb0c7bf324', + TopicArn: 'arn:aws:sns:us-east-1:123456789012:stream-webhooks', + Message: innerMessage, + Timestamp: '2026-05-11T10:00:00.000Z', + SignatureVersion: '1', + MessageAttributes: { + 'X-Signature': { Type: 'String', Value: '' }, + }, + }); + +describe('Webhook verification + parsing', () => { + let client: StreamClient; + + beforeEach(() => { + client = new StreamClient(API_KEY, API_SECRET); + }); + + describe('verifyWebhook (legacy boolean helper, unchanged)', () => { + it('validates a plain JSON body with its HMAC signature', () => { + expect(client.verifyWebhook(JSON_BODY, sign(JSON_BODY))).toBe(true); + }); + + it('rejects when signature is wrong', () => { + expect(client.verifyWebhook(JSON_BODY, 'deadbeef')).toBe(false); + }); + }); + + describe('verifySignature', () => { + it('returns true for matching HMAC', () => { + expect(verifySignature(JSON_BODY, sign(JSON_BODY), API_SECRET)).toBe( + true, + ); + }); + + it('returns false for mismatched signature', () => { + expect(verifySignature(JSON_BODY, '0'.repeat(64), API_SECRET)).toBe( + false, + ); + }); + + it('returns false for wrong secret', () => { + const sig = crypto + .createHmac('sha256', 'other') + .update(JSON_BODY) + .digest('hex'); + expect(verifySignature(JSON_BODY, sig, API_SECRET)).toBe(false); + }); + + it('rejects signatures computed over compressed bytes', () => { + const compressed = gzip(JSON_BODY); + expect(verifySignature(JSON_BODY, sign(compressed), API_SECRET)).toBe( + false, + ); + }); + }); + + describe('gunzipPayload', () => { + it('passes through plain bytes unchanged', () => { + expect(gunzipPayload(JSON_BODY).toString('utf8')).toBe(JSON_BODY); + }); + + it('passes through Buffer input unchanged', () => { + expect(gunzipPayload(Buffer.from(JSON_BODY)).toString('utf8')).toBe( + JSON_BODY, + ); + }); + + it('inflates gzip-magic bytes', () => { + expect(gunzipPayload(gzip(JSON_BODY)).toString('utf8')).toBe(JSON_BODY); + }); + + it('returns Buffer in all cases', () => { + expect(Buffer.isBuffer(gunzipPayload(JSON_BODY))).toBe(true); + expect(Buffer.isBuffer(gunzipPayload(gzip(JSON_BODY)))).toBe(true); + }); + + it('handles empty input', () => { + expect(gunzipPayload(Buffer.alloc(0)).length).toBe(0); + }); + + it('throws InvalidWebhookError on truncated gzip with magic', () => { + const bad = Buffer.concat([ + Buffer.from([0x1f, 0x8b]), + Buffer.from([0, 0, 0]), + ]); + expect(() => gunzipPayload(bad)).toThrow(InvalidWebhookError); + expect(() => gunzipPayload(bad)).toThrow( + InvalidWebhookErrorMessages.gzipFailed, + ); + }); + }); + + describe('decodeSqsPayload', () => { + it('decodes base64 only (no compression)', () => { + expect(decodeSqsPayload(base64(JSON_BODY)).toString('utf8')).toBe( + JSON_BODY, + ); + }); + + it('decodes base64 + gzip', () => { + expect(decodeSqsPayload(base64(gzip(JSON_BODY))).toString('utf8')).toBe( + JSON_BODY, + ); + }); + + it('throws InvalidWebhookError on malformed base64', () => { + expect(() => decodeSqsPayload('!!!not-base64!!!')).toThrow( + InvalidWebhookError, + ); + expect(() => decodeSqsPayload('!!!not-base64!!!')).toThrow( + InvalidWebhookErrorMessages.invalidBase64, + ); + }); + }); + + describe('decodeSnsPayload', () => { + it('treats a pre-extracted Message identically to decodeSqsPayload', () => { + const wrapped = base64(gzip(JSON_BODY)); + expect(decodeSnsPayload(wrapped).equals(decodeSqsPayload(wrapped))).toBe( + true, + ); + }); + + it('round-trips base64 + gzip (pre-extracted Message)', () => { + expect(decodeSnsPayload(base64(gzip(JSON_BODY))).toString('utf8')).toBe( + JSON_BODY, + ); + }); + + it('unwraps a full SNS HTTP notification envelope', () => { + const wrapped = base64(gzip(JSON_BODY)); + const envelope = snsEnvelope(wrapped); + expect(decodeSnsPayload(envelope).toString('utf8')).toBe(JSON_BODY); + }); + + it('handles whitespace before the envelope JSON', () => { + const wrapped = base64(gzip(JSON_BODY)); + const envelope = `\n ${snsEnvelope(wrapped)}`; + expect(decodeSnsPayload(envelope).toString('utf8')).toBe(JSON_BODY); + }); + }); + + describe('parseEvent', () => { + it('parses Buffer payload into a typed event', () => { + const ev = parseEvent(Buffer.from(JSON_BODY)); + expect(ev.type).toBe('user.updated'); + }); + + it('parses string payload', () => { + const ev = parseEvent(JSON_BODY); + expect(ev.type).toBe('user.updated'); + }); + + it('throws InvalidWebhookError on malformed JSON', () => { + expect(() => parseEvent('not json')).toThrow(InvalidWebhookError); + expect(() => parseEvent('not json')).toThrow( + InvalidWebhookErrorMessages.invalidJson, + ); + }); + }); + + describe('verifyAndParseWebhook', () => { + it('parses a plain HTTP webhook with a valid signature', () => { + const ev = client.verifyAndParseWebhook(JSON_BODY, sign(JSON_BODY)); + expect(ev.type).toBe('user.updated'); + }); + + it('parses a gzip-compressed HTTP webhook', () => { + const ev = client.verifyAndParseWebhook(gzip(JSON_BODY), sign(JSON_BODY)); + expect(ev.type).toBe('user.updated'); + }); + + it('throws InvalidWebhookError on signature mismatch', () => { + expect(() => client.verifyAndParseWebhook(JSON_BODY, 'deadbeef')).toThrow( + InvalidWebhookError, + ); + expect(() => client.verifyAndParseWebhook(JSON_BODY, 'deadbeef')).toThrow( + InvalidWebhookErrorMessages.signatureMismatch, + ); + }); + + it('rejects a gzip body when the signature was computed over compressed bytes', () => { + const compressed = gzip(JSON_BODY); + expect(() => + client.verifyAndParseWebhook(compressed, sign(compressed)), + ).toThrow(InvalidWebhookError); + }); + + it('also works as a package-level function', () => { + const ev = verifyAndParseWebhook(JSON_BODY, sign(JSON_BODY), API_SECRET); + expect(ev.type).toBe('user.updated'); + }); + }); + + describe('parseSqs', () => { + it('parses a base64-only SQS body', () => { + const ev = client.parseSqs(base64(JSON_BODY)); + expect(ev.type).toBe('user.updated'); + }); + + it('parses a base64 + gzip SQS body', () => { + const wrapped = base64(gzip(JSON_BODY)); + const ev = client.parseSqs(wrapped); + expect(ev.type).toBe('user.updated'); + }); + + it('also works as a package-level function', () => { + const wrapped = base64(gzip(JSON_BODY)); + const ev = parseSqs(wrapped); + expect(ev.type).toBe('user.updated'); + }); + + it('surfaces malformed base64 as InvalidWebhookError', () => { + expect(() => client.parseSqs('!!!not-base64!!!')).toThrow( + InvalidWebhookError, + ); + }); + }); + + describe('parseSns', () => { + it('parses a pre-extracted base64 + gzip SNS message', () => { + const wrapped = base64(gzip(JSON_BODY)); + const ev = client.parseSns(wrapped); + expect(ev.type).toBe('user.updated'); + }); + + it('produces the same event as parseSqs (pre-extracted Message)', () => { + const wrapped = base64(gzip(JSON_BODY)); + expect(client.parseSns(wrapped)).toEqual(client.parseSqs(wrapped)); + }); + + it('parses a full SNS HTTP notification envelope', () => { + const wrapped = base64(gzip(JSON_BODY)); + const envelope = snsEnvelope(wrapped); + const ev = client.parseSns(envelope); + expect(ev.type).toBe('user.updated'); + }); + + it('also works as a package-level function', () => { + const wrapped = base64(gzip(JSON_BODY)); + const ev = parseSns(wrapped); + expect(ev.type).toBe('user.updated'); + }); + }); +}); diff --git a/index.ts b/index.ts index 3914180..9177b90 100644 --- a/index.ts +++ b/index.ts @@ -6,3 +6,15 @@ export * from './src/StreamVideoClient'; export * from './src/gen/models'; export * from './src/StreamFeedsClient'; export * from './src/StreamFeed'; +export { + InvalidWebhookError, + InvalidWebhookErrorMessages, + decodeSnsPayload, + decodeSqsPayload, + gunzipPayload, + parseEvent, + parseSns, + parseSqs, + verifyAndParseWebhook, + verifySignature, +} from './src/utils/webhook'; diff --git a/src/StreamClient.ts b/src/StreamClient.ts index a4f2cde..171db75 100644 --- a/src/StreamClient.ts +++ b/src/StreamClient.ts @@ -1,4 +1,11 @@ import { JWTServerToken, JWTUserToken } from './utils/create-token'; +import { + InvalidWebhookError, + InvalidWebhookErrorMessages, + parseSns as parseSnsHelper, + parseSqs as parseSqsHelper, + verifyAndParseWebhook as verifyAndParseWebhookHelper, +} from './utils/webhook'; import { CommonApi } from './gen/common/CommonApi'; import { StreamVideoClient } from './StreamVideoClient'; import crypto from 'crypto'; @@ -253,4 +260,48 @@ export class StreamClient extends CommonApi { return false; } }; + + /** + * Verify and parse an HTTP webhook event. + * + * Decompresses `rawBody` when gzipped (detected from the body bytes), + * verifies the `X-Signature` header against the app's API secret, and + * returns the parsed typed event. Works whether or not Stream is + * currently compressing payloads for this app, and stays correct behind + * middleware that auto-decompresses the request. + * + * @param rawBody Raw HTTP request body bytes Stream signed + * @param signature Value of the `X-Signature` header + * @throws {InvalidWebhookError} When the signature does not match or + * the gzip / JSON envelope is malformed. + */ + verifyAndParseWebhook = (rawBody: string | Buffer, signature: string) => { + if (!this.secret) { + throw new InvalidWebhookError( + 'cannot verify webhook signature without an API secret on the client', + ); + } + return verifyAndParseWebhookHelper(rawBody, signature, this.secret); + }; + + /** + * Parse an SQS firehose event: decodes the message `Body` (base64 + + * optional gzip) and returns the parsed typed event. No HMAC + * verification (Stream does not sign SQS bodies). + * + * @param messageBody SQS message `Body` string + * @throws {InvalidWebhookError} When the base64 / gzip envelope is malformed. + */ + parseSqs = (messageBody: string) => parseSqsHelper(messageBody); + + /** + * Parse an SNS-delivered event (unwraps envelope JSON when needed, then + * same decode path as SQS). No HMAC verification. + * + * @param notificationBody Raw SNS POST body or pre-extracted `Message` string + * @throws {InvalidWebhookError} When the envelope cannot be decoded. + */ + parseSns = (notificationBody: string) => parseSnsHelper(notificationBody); } + +export { InvalidWebhookError, InvalidWebhookErrorMessages }; diff --git a/src/utils/webhook.ts b/src/utils/webhook.ts new file mode 100644 index 0000000..add00b1 --- /dev/null +++ b/src/utils/webhook.ts @@ -0,0 +1,206 @@ +import crypto from 'crypto'; +import zlib from 'zlib'; +import type { WHEvent } from '../gen/models'; + +/** + * Canonical failure-mode messages for {@link InvalidWebhookError}. + * + * Callers that prefer exact-match filtering (security logging, retry + * policy) over substring matches can compare `err.message` to these + * constants instead of pattern-matching free-form text. + */ +export const InvalidWebhookErrorMessages = { + signatureMismatch: 'signature mismatch', + invalidBase64: 'invalid base64 encoding', + gzipFailed: 'gzip decompression failed', + invalidJson: 'invalid JSON payload', +} as const; + +/** + * Thrown by {@link verifyAndParseWebhook} when the supplied `x-signature` + * does not match the HMAC of the uncompressed payload, and by all webhook + * helpers (including {@link parseSqs} / {@link parseSns}) when a + * gzip / base64 / JSON envelope is malformed. + * + * The message identifies which failure mode fired. See + * {@link InvalidWebhookErrorMessages} for the canonical strings. + */ +export class InvalidWebhookError extends Error { + public name = 'InvalidWebhookError'; + + constructor(message: string = InvalidWebhookErrorMessages.signatureMismatch) { + super(message); + } +} + +const GZIP_MAGIC = Buffer.from([0x1f, 0x8b]); + +/** + * Returns `body` as a `Buffer`, gzip-decompressed when its first two + * bytes match the gzip magic (`1f 8b`, per RFC 1952). When the body is + * plain JSON (no compression, or middleware already decompressed), the + * bytes are returned unchanged. + * + * Magic-byte detection (rather than relying on a header) keeps the same + * handler correct when middleware - Express, Next.js, AWS Lambda - + * auto-decompresses the request before your code sees it. + */ +export function gunzipPayload(rawBody: string | Buffer): Buffer { + const body = Buffer.isBuffer(rawBody) ? rawBody : Buffer.from(rawBody); + if (body.length >= 2 && body.subarray(0, 2).equals(GZIP_MAGIC)) { + try { + return zlib.gunzipSync(body); + } catch { + throw new InvalidWebhookError(InvalidWebhookErrorMessages.gzipFailed); + } + } + return body; +} + +/** + * Reverses the SQS firehose envelope: the message `Body` is + * base64-decoded, then the result is gzip-decompressed when it begins + * with the gzip magic. Returns the raw JSON `Buffer` Stream signed. + * + * SQS bodies are always base64-encoded so they remain valid UTF-8 over + * the queue. The same call works whether or not Stream is currently + * compressing payloads for this app. + */ +export function decodeSqsPayload(body: string): Buffer { + // Reject anything that isn't canonical base64 up front. Node's base64 + // decoder is permissive (silently strips characters outside the + // alphabet, accepts both standard and URL-safe variants), so we have + // to be strict here to avoid silently corrupting the body before the + // signature check runs. + if (!/^[A-Za-z0-9+/]*={0,2}$/.test(body) || body.length % 4 !== 0) { + throw new InvalidWebhookError(InvalidWebhookErrorMessages.invalidBase64); + } + const decoded = Buffer.from(body, 'base64'); + if (decoded.toString('base64').length !== body.length) { + throw new InvalidWebhookError(InvalidWebhookErrorMessages.invalidBase64); + } + return gunzipPayload(decoded); +} + +/** + * Reverses an SNS HTTP notification envelope. When `notificationBody` + * is a JSON envelope (`{"Type":"Notification","Message":"..."}`), the + * inner `Message` field is extracted and run through the SQS pipeline + * (base64-decode, then gzip-if-magic). When the input is not a JSON + * envelope it is treated as the already-extracted `Message` string, + * so call sites that pre-unwrap continue to work. + */ +export function decodeSnsPayload(notificationBody: string): Buffer { + const inner = extractSnsMessage(notificationBody); + return decodeSqsPayload(inner ?? notificationBody); +} + +function extractSnsMessage(notificationBody: string): string | null { + const trimmed = notificationBody.replace(/^[\s\uFEFF]+/, ''); + if (!trimmed.startsWith('{')) { + return null; + } + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch { + return null; + } + if ( + parsed === null || + typeof parsed !== 'object' || + Array.isArray(parsed) || + typeof (parsed as { Message?: unknown }).Message !== 'string' + ) { + return null; + } + return (parsed as { Message: string }).Message; +} + +/** + * Constant-time HMAC-SHA256 verification of `signature` against the + * digest of `body` using `secret` as the key. The signature is always + * computed over the **uncompressed** JSON bytes, so callers that + * decoded a gzipped or base64-wrapped payload must pass the inflated + * bytes here. + * + * The legacy `client.verifyWebhook` helper wraps this function, so + * callers that have already migrated to `verifyAndParseWebhook`, + * `parseSqs`, or `parseSns` rarely need to invoke this directly. + */ +export function verifySignature( + body: string | Buffer, + signature: string, + secret: string, +): boolean { + const key = Buffer.from(secret, 'utf8'); + const hash = crypto.createHmac('sha256', key).update(body).digest('hex'); + try { + return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(signature)); + } catch { + return false; + } +} + +/** + * Parse a JSON-encoded webhook event into a typed {@link WHEvent}. New + * event types Stream introduces still parse successfully — the runtime + * shape is the JSON Stream sent and the `type` field stays preserved. + */ +export function parseEvent(payload: Buffer | string): WHEvent { + const text = Buffer.isBuffer(payload) ? payload.toString('utf8') : payload; + try { + return JSON.parse(text) as WHEvent; + } catch { + throw new InvalidWebhookError(InvalidWebhookErrorMessages.invalidJson); + } +} + +function verifyAndParse( + payload: Buffer, + signature: string, + secret: string, +): WHEvent { + if (!verifySignature(payload, signature, secret)) { + throw new InvalidWebhookError( + InvalidWebhookErrorMessages.signatureMismatch, + ); + } + return parseEvent(payload); +} + +/** + * Decompress (when gzipped), verify the HMAC `signature`, and return + * the parsed {@link WHEvent}. + * + * @param rawBody Raw HTTP request body bytes Stream signed + * @param signature Value of the `X-Signature` header + * @param secret Your app's API secret + * @throws {InvalidWebhookError} When the signature does not match or + * the gzip / base64 / JSON envelope is malformed. + */ +export function verifyAndParseWebhook( + rawBody: string | Buffer, + signature: string, + secret: string, +): WHEvent { + return verifyAndParse(gunzipPayload(rawBody), signature, secret); +} + +/** + * Decode the SQS message `Body` (base64, then gzip-if-magic) and return + * the parsed {@link WHEvent}. Stream does not attach an application-level + * HMAC to SQS deliveries — use {@link verifyAndParseWebhook} for HTTP + * webhooks. + */ +export function parseSqs(messageBody: string): WHEvent { + return parseEvent(decodeSqsPayload(messageBody)); +} + +/** + * Decode an SNS notification (unwrap the JSON envelope when needed; same + * inner format as SQS). No application-level HMAC verification. + */ +export function parseSns(notificationBody: string): WHEvent { + return parseEvent(decodeSnsPayload(notificationBody)); +} From 6aafdaedf3450f793b4f936cb1d0dd9ba896d646 Mon Sep 17 00:00:00 2001 From: Zita Szupera Date: Tue, 12 May 2026 11:40:33 -0500 Subject: [PATCH 2/3] refactor: remove unnecessary exports --- index.ts | 8 -------- src/StreamClient.ts | 2 -- 2 files changed, 10 deletions(-) diff --git a/index.ts b/index.ts index 9177b90..71c9a50 100644 --- a/index.ts +++ b/index.ts @@ -9,12 +9,4 @@ export * from './src/StreamFeed'; export { InvalidWebhookError, InvalidWebhookErrorMessages, - decodeSnsPayload, - decodeSqsPayload, - gunzipPayload, - parseEvent, - parseSns, - parseSqs, - verifyAndParseWebhook, - verifySignature, } from './src/utils/webhook'; diff --git a/src/StreamClient.ts b/src/StreamClient.ts index 171db75..eead54f 100644 --- a/src/StreamClient.ts +++ b/src/StreamClient.ts @@ -303,5 +303,3 @@ export class StreamClient extends CommonApi { */ parseSns = (notificationBody: string) => parseSnsHelper(notificationBody); } - -export { InvalidWebhookError, InvalidWebhookErrorMessages }; From 3eeb74717c0ec0107becb04231f34378e5822e43 Mon Sep 17 00:00:00 2001 From: Zita Szupera Date: Tue, 12 May 2026 11:42:48 -0500 Subject: [PATCH 3/3] fix: lint error --- src/StreamClient.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/StreamClient.ts b/src/StreamClient.ts index eead54f..13e7fcb 100644 --- a/src/StreamClient.ts +++ b/src/StreamClient.ts @@ -1,7 +1,6 @@ import { JWTServerToken, JWTUserToken } from './utils/create-token'; import { InvalidWebhookError, - InvalidWebhookErrorMessages, parseSns as parseSnsHelper, parseSqs as parseSqsHelper, verifyAndParseWebhook as verifyAndParseWebhookHelper,