From 438566fea4f0933a5c84ea23fd1e62857711b3d0 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Thu, 7 May 2026 12:14:39 +0200 Subject: [PATCH 01/15] feat(webhooks): add verify_and_decode_webhook for compressed payloads (CHA-3071) Stream Chat backend can now compress outbound webhook payloads with gzip and, for SQS / SNS firehose delivery, base64-wrap the compressed bytes so they remain valid UTF-8 over the queue. Add two new client methods that let customers decompress + verify in a single call: - decompress_webhook_body(body, content_encoding=None, payload_encoding=None) primitive decode that handles gzip and/or base64 - verify_and_decode_webhook(body, x_signature, content_encoding=None, payload_encoding=None) decode + HMAC-SHA256 verify Both are exposed on the sync StreamChat and async StreamChatAsync clients through the shared StreamChatInterface base, mirroring the existing verify_webhook helper. The existing verify_webhook signature and behavior are unchanged for backward compatibility. A new WebhookSignatureError (extends StreamAPIException) is raised on signature mismatch, malformed gzip, or malformed base64. Unsupported encoding values raise ValueError with a message that points at the supported algorithm (gzip). The decoding logic lives in stream_chat/webhook.py so it can be tested without instantiating an HTTP client. The new tests cover the cross-SDK contract: passthrough, gzip round-trip, base64 round-trip, base64 + gzip (SQS / SNS shape), case-insensitive aliases, every unsupported content_encoding (br / brotli / zstd / deflate / compress / lz4), unsupported payload_encoding (hex / url / binary), invalid gzip / base64 input, and three signature-mismatch variants (wrong signature, signature over compressed bytes, signature over wrapped bytes). Docs: webhooks_overview.md gets a "Compressed webhook bodies" section with Django, Flask, and SQS / SNS usage examples. Co-authored-by: Cursor --- .../webhooks_overview/webhooks_overview.md | 73 +++++ stream_chat/base/client.py | 79 +++++ stream_chat/base/exceptions.py | 14 + stream_chat/tests/test_webhook_compression.py | 274 ++++++++++++++++++ stream_chat/webhook.py | 137 +++++++++ 5 files changed, 577 insertions(+) create mode 100644 stream_chat/tests/test_webhook_compression.py create mode 100644 stream_chat/webhook.py diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index e9cff35f..58313f08 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -90,6 +90,79 @@ valid = client.verify_webhook(request.body, request.META['HTTP_X_SIGNATURE']) valid = client.verify_webhook(request.data, request.headers['X-SIGNATURE']) ``` +### Compressed webhook bodies + +GZIP compression can be enabled for hooks payloads from the Dashboard. Enabling compression reduces the payload size significantly (often 70–90% smaller) reducing your bandwidth usage on Stream. The computation overhead introduced by the decompression step is usually negligible and offset by the much smaller payload. + +When payload compression is enabled, webhook HTTP requests will include the `Content-Encoding: gzip` header and the request body will be compressed with GZIP. Some HTTP servers and middleware (Rails, Django, Laravel, Spring Boot, ASP.NET) handle this transparently and strip the header before your handler runs — in that case the body you see is already raw JSON. + +Before enabling compression, make sure that: + +* Your backend integration is using a recent version of our official SDKs with compression support +* If you don't use an official SDK, make sure that your code supports receiving compressed payloads +* The payload signature check is done on the **uncompressed** payload + +The Python SDK ships with `client.verify_and_decode_webhook(...)` which transparently handles plain, gzip-compressed, and base64-wrapped (SQS / SNS firehose) payloads. It returns the raw JSON body as `bytes`, ready to pass to `json.loads`. + +```python +import json +from stream_chat import StreamChat + +client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") + +# Django view +def stream_webhook(request): + body = client.verify_and_decode_webhook( + request.body, + request.headers["X-Signature"], + request.headers.get("Content-Encoding"), + ) + event = json.loads(body) + # ... handle event ... +``` + +```python +import json +from flask import request +from stream_chat import StreamChat + +client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") + +@app.route("/webhooks/stream", methods=["POST"]) +def stream_webhook(): + body = client.verify_and_decode_webhook( + request.get_data(), + request.headers["X-Signature"], + request.headers.get("Content-Encoding"), + ) + event = json.loads(body) + # ... handle event ... +``` + +If your HTTP framework or a middleware already decompressed the body before it reached your handler, the `Content-Encoding` header will be missing (or set to `identity`) and `verify_and_decode_webhook` will be a no-op for the decompression step — the same call works in both cases. + +`verify_and_decode_webhook` raises `stream_chat.base.exceptions.WebhookSignatureError` when the signature does not match or the body cannot be decoded. + +The original `client.verify_webhook(request.body, request.headers["X-Signature"])` is unchanged and still available for handlers that prefer to verify and parse the body separately. + +#### SQS / SNS firehose + +When delivering events through SQS or SNS, Stream base64-wraps the (possibly gzip-compressed) body so the payload stays valid UTF-8 over the queue. Pass `payload_encoding="base64"` so `verify_and_decode_webhook` unwraps the envelope before verifying the HMAC signature, which is always computed over the uncompressed JSON. + +```python +body = client.verify_and_decode_webhook( + sqs_message["Body"], + sqs_message["MessageAttributes"]["X-Signature"]["StringValue"], + content_encoding=sqs_message["MessageAttributes"] + .get("Content-Encoding", {}) + .get("StringValue"), + payload_encoding="base64", +) +event = json.loads(body) +``` + +If you only need to decode the body without checking the signature (for example because you have already verified it elsewhere), use `client.decompress_webhook_body(body, content_encoding, payload_encoding)`. + All webhook requests contain these headers: | Name | Description | Example | diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index 750fcc83..1230c65a 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -133,6 +133,85 @@ def verify_webhook( ).hexdigest() return signature == x_signature + def decompress_webhook_body( + self, + body: Union[bytes, str], + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, + ) -> bytes: + """Decode a (possibly compressed and/or wrapped) webhook payload. + + Stream Chat can compress outbound webhook payloads with gzip and, for + SQS / SNS firehose delivery, also wrap the compressed bytes in base64 + so they remain valid UTF-8 over the queue. This helper applies the + encodings in order: + + 1. ``payload_encoding`` (``"base64"`` / ``"b64"``) is unwrapped first. + 2. ``content_encoding`` (``"gzip"``) is decompressed next. + 3. The raw JSON bytes are returned. The caller can ``.decode("utf-8")`` + or pass the value straight to :func:`json.loads`, which accepts + bytes. + + ``None`` or an empty string for either encoding is a no-op, so the + regular HTTP webhook path stays bytewise identical to today. + + This method does **not** check the ``X-Signature`` header. Use + :meth:`verify_and_decode_webhook` for the combined decode + verify + flow. + + :param body: raw bytes (or str) received from Stream + :param content_encoding: value of the ``Content-Encoding`` header + (only ``"gzip"`` is supported) + :param payload_encoding: wrapper around the compressed bytes + (``"base64"`` / ``"b64"``); used by the SQS / SNS firehose + :returns: the uncompressed JSON body as bytes + """ + from stream_chat.webhook import decompress_webhook_body + + return decompress_webhook_body( + body, + content_encoding=content_encoding, + payload_encoding=payload_encoding, + ) + + def verify_and_decode_webhook( + self, + body: Union[bytes, str], + x_signature: Union[str, bytes], + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, + ) -> bytes: + """Decode a webhook payload and verify its HMAC-SHA256 signature. + + The signature is always computed over the **uncompressed** JSON + payload, so this method first decodes the body via + :meth:`decompress_webhook_body` and then compares the digest with + ``x_signature`` using :func:`hmac.compare_digest`. + + Works for plain HTTP webhooks (pass the ``Content-Encoding`` header + value) and for SQS / SNS firehose envelopes (additionally pass + ``payload_encoding="base64"``). + + :param body: raw bytes (or str) received from Stream + :param x_signature: the ``X-Signature`` header value sent by Stream + :param content_encoding: value of the ``Content-Encoding`` header + (only ``"gzip"`` is supported) + :param payload_encoding: wrapper around the compressed bytes + (``"base64"`` / ``"b64"``); used by the SQS / SNS firehose + :returns: the verified, uncompressed JSON body as bytes + :raises stream_chat.base.exceptions.WebhookSignatureError: on + signature mismatch or any decode error + """ + from stream_chat.webhook import verify_and_decode_webhook + + return verify_and_decode_webhook( + body, + x_signature, + api_secret=self.api_secret, + content_encoding=content_encoding, + payload_encoding=payload_encoding, + ) + @abc.abstractmethod def update_app_settings( self, **settings: Any diff --git a/stream_chat/base/exceptions.py b/stream_chat/base/exceptions.py index 9ed295bb..3241f7f8 100644 --- a/stream_chat/base/exceptions.py +++ b/stream_chat/base/exceptions.py @@ -25,3 +25,17 @@ def __str__(self) -> str: return f'StreamChat error code {self.error_code}: {self.error_message}"' else: return f"StreamChat error HTTP code: {self.status_code}" + + +class WebhookSignatureError(StreamAPIException): + """Raised when an outbound webhook signature does not match, the + webhook payload cannot be decompressed, or the wrapping (e.g. base64) + cannot be decoded. + """ + + def __init__(self, message: str) -> None: + super().__init__(message, status_code=0) + self.message = message + + def __str__(self) -> str: + return f"WebhookSignatureError: {self.message}" diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py new file mode 100644 index 00000000..6b28cb67 --- /dev/null +++ b/stream_chat/tests/test_webhook_compression.py @@ -0,0 +1,274 @@ +import base64 +import gzip +import hashlib +import hmac + +import pytest + +from stream_chat import StreamChat, StreamChatAsync +from stream_chat.base.exceptions import WebhookSignatureError +from stream_chat.webhook import ( + decompress_webhook_body, + verify_and_decode_webhook, +) + +API_KEY = "tkey" +API_SECRET = "tsec2" +JSON_BODY = b'{"type":"message.new","message":{"text":"the quick brown fox"}}' + + +def _sign(body: bytes, secret: str = API_SECRET) -> str: + return hmac.new(key=secret.encode(), msg=body, digestmod=hashlib.sha256).hexdigest() + + +def _gzip(body: bytes) -> bytes: + return gzip.compress(body) + + +def _b64(body: bytes) -> bytes: + return base64.b64encode(body) + + +@pytest.fixture +def sync_client() -> StreamChat: + return StreamChat(api_key=API_KEY, api_secret=API_SECRET) + + +class TestVerifyWebhookBackwardCompat: + def test_verify_webhook_matches_signature(self, sync_client: StreamChat): + signature = _sign(JSON_BODY) + assert sync_client.verify_webhook(JSON_BODY, signature) is True + + def test_verify_webhook_rejects_bad_signature(self, sync_client: StreamChat): + assert sync_client.verify_webhook(JSON_BODY, "0" * 64) is False + + def test_verify_webhook_accepts_bytes_signature(self, sync_client: StreamChat): + signature = _sign(JSON_BODY).encode() + assert sync_client.verify_webhook(JSON_BODY, signature) is True + + +class TestDecompressWebhookBody: + def test_passthrough_when_no_encodings(self): + assert decompress_webhook_body(JSON_BODY) == JSON_BODY + + def test_passthrough_when_encodings_are_empty_strings(self): + assert ( + decompress_webhook_body(JSON_BODY, content_encoding="", payload_encoding="") + == JSON_BODY + ) + + def test_passthrough_when_encodings_are_none(self): + assert ( + decompress_webhook_body( + JSON_BODY, content_encoding=None, payload_encoding=None + ) + == JSON_BODY + ) + + def test_gzip_round_trip_bytes(self): + compressed = _gzip(JSON_BODY) + assert decompress_webhook_body(compressed, content_encoding="gzip") == JSON_BODY + + def test_gzip_round_trip_str_input(self): + compressed = _gzip(JSON_BODY) + wrapped = compressed.decode("latin-1") + assert ( + decompress_webhook_body(wrapped.encode("latin-1"), content_encoding="gzip") + == JSON_BODY + ) + + def test_base64_round_trip_no_compression(self): + wrapped = _b64(JSON_BODY) + assert decompress_webhook_body(wrapped, payload_encoding="base64") == JSON_BODY + + def test_base64_str_input(self): + wrapped_str = _b64(JSON_BODY).decode("ascii") + assert ( + decompress_webhook_body(wrapped_str, payload_encoding="base64") == JSON_BODY + ) + + def test_base64_plus_gzip_round_trip(self): + wrapped = _b64(_gzip(JSON_BODY)) + assert ( + decompress_webhook_body( + wrapped, content_encoding="gzip", payload_encoding="base64" + ) + == JSON_BODY + ) + + @pytest.mark.parametrize( + "content_encoding", + ["GZIP", "Gzip", " gzip ", "gZiP"], + ) + def test_content_encoding_is_case_insensitive(self, content_encoding: str): + compressed = _gzip(JSON_BODY) + assert ( + decompress_webhook_body(compressed, content_encoding=content_encoding) + == JSON_BODY + ) + + @pytest.mark.parametrize( + "payload_encoding", + ["BASE64", "Base64", " base64 ", "B64", "b64", " b64 "], + ) + def test_payload_encoding_aliases_and_case(self, payload_encoding: str): + wrapped = _b64(JSON_BODY) + assert ( + decompress_webhook_body(wrapped, payload_encoding=payload_encoding) + == JSON_BODY + ) + + @pytest.mark.parametrize( + "content_encoding", ["br", "brotli", "zstd", "deflate", "compress", "lz4"] + ) + def test_unsupported_content_encoding(self, content_encoding: str): + with pytest.raises(ValueError) as exc_info: + decompress_webhook_body(JSON_BODY, content_encoding=content_encoding) + message = str(exc_info.value).lower() + assert "unsupported" in message + assert "gzip" in message + + @pytest.mark.parametrize("payload_encoding", ["hex", "url", "binary"]) + def test_unsupported_payload_encoding(self, payload_encoding: str): + with pytest.raises(ValueError) as exc_info: + decompress_webhook_body(JSON_BODY, payload_encoding=payload_encoding) + message = str(exc_info.value).lower() + assert "unsupported" in message + assert "payload_encoding" in message + + def test_invalid_gzip_bytes_raises(self): + with pytest.raises(WebhookSignatureError) as exc_info: + decompress_webhook_body(b"this is not gzip data", content_encoding="gzip") + assert "decompress" in str(exc_info.value).lower() + + def test_invalid_base64_input_raises(self): + with pytest.raises(WebhookSignatureError) as exc_info: + decompress_webhook_body( + b"!!!not-valid-base64!!!", payload_encoding="base64" + ) + assert "payload_encoding" in str(exc_info.value).lower() + + def test_returns_bytes_type(self): + result = decompress_webhook_body(JSON_BODY) + assert isinstance(result, bytes) + + def test_unsupported_message_includes_value(self): + with pytest.raises(ValueError) as exc_info: + decompress_webhook_body(JSON_BODY, content_encoding="brotli") + assert "brotli" in str(exc_info.value) + + +class TestVerifyAndDecodeWebhookHelper: + def test_happy_path_plain(self): + signature = _sign(JSON_BODY) + assert ( + verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) + == JSON_BODY + ) + + def test_happy_path_gzip(self): + compressed = _gzip(JSON_BODY) + signature = _sign(JSON_BODY) + assert ( + verify_and_decode_webhook( + compressed, + signature, + api_secret=API_SECRET, + content_encoding="gzip", + ) + == JSON_BODY + ) + + def test_happy_path_base64_plus_gzip(self): + wrapped = _b64(_gzip(JSON_BODY)) + signature = _sign(JSON_BODY) + assert ( + verify_and_decode_webhook( + wrapped, + signature, + api_secret=API_SECRET, + content_encoding="gzip", + payload_encoding="base64", + ) + == JSON_BODY + ) + + def test_signature_mismatch_raises(self): + with pytest.raises(WebhookSignatureError) as exc_info: + verify_and_decode_webhook(JSON_BODY, "0" * 64, api_secret=API_SECRET) + assert "invalid webhook signature" in str(exc_info.value).lower() + + def test_signature_over_compressed_bytes_raises(self): + compressed = _gzip(JSON_BODY) + signature_over_compressed = _sign(compressed) + with pytest.raises(WebhookSignatureError): + verify_and_decode_webhook( + compressed, + signature_over_compressed, + api_secret=API_SECRET, + content_encoding="gzip", + ) + + def test_signature_over_wrapped_bytes_raises(self): + wrapped = _b64(_gzip(JSON_BODY)) + signature_over_wrapped = _sign(wrapped) + with pytest.raises(WebhookSignatureError): + verify_and_decode_webhook( + wrapped, + signature_over_wrapped, + api_secret=API_SECRET, + content_encoding="gzip", + payload_encoding="base64", + ) + + def test_bad_secret_raises(self): + signature = _sign(JSON_BODY, secret="other") + with pytest.raises(WebhookSignatureError): + verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) + + def test_signature_can_be_bytes(self): + signature = _sign(JSON_BODY).encode() + assert ( + verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) + == JSON_BODY + ) + + +class TestSyncClientMethods: + def test_decompress_via_client(self, sync_client: StreamChat): + wrapped = _b64(_gzip(JSON_BODY)) + assert ( + sync_client.decompress_webhook_body( + wrapped, content_encoding="gzip", payload_encoding="base64" + ) + == JSON_BODY + ) + + def test_verify_and_decode_via_client(self, sync_client: StreamChat): + signature = _sign(JSON_BODY) + compressed = _gzip(JSON_BODY) + assert ( + sync_client.verify_and_decode_webhook( + compressed, signature, content_encoding="gzip" + ) + == JSON_BODY + ) + + def test_verify_and_decode_via_client_signature_mismatch( + self, sync_client: StreamChat + ): + with pytest.raises(WebhookSignatureError): + sync_client.verify_and_decode_webhook(JSON_BODY, "0" * 64) + + +class TestAsyncClientMethods: + async def test_async_verify_and_decode_happy_path(self): + signature = _sign(JSON_BODY) + compressed = _gzip(JSON_BODY) + async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: + assert ( + client.verify_and_decode_webhook( + compressed, signature, content_encoding="gzip" + ) + == JSON_BODY + ) diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py new file mode 100644 index 00000000..941c32bf --- /dev/null +++ b/stream_chat/webhook.py @@ -0,0 +1,137 @@ +"""Helpers for verifying and decoding outbound Stream webhook payloads. + +Stream Chat can compress outbound webhook payloads with gzip and, for SQS / SNS +firehose delivery, also wrap the compressed bytes in base64 so they remain +valid UTF-8 over the queue. The helpers in this module mirror the cross-SDK +contract: callers can either decode the body without checking the signature +(:func:`decompress_webhook_body`) or do decode + HMAC verification in one call +(:func:`verify_and_decode_webhook`). + +The functions live outside the client classes so they can be exercised in +isolation, without instantiating an HTTP client. The client methods just +delegate here, passing ``self.api_secret``. +""" + +import base64 +import gzip +import hashlib +import hmac +from typing import Optional, Union + +from stream_chat.base.exceptions import WebhookSignatureError + +_BASE64_ALIASES = frozenset({"base64", "b64"}) +_GZIP_ALIASES = frozenset({"gzip"}) + + +def _to_bytes(body: Union[bytes, str]) -> bytes: + if isinstance(body, str): + return body.encode("utf-8") + if isinstance(body, (bytes, bytearray, memoryview)): + return bytes(body) + raise TypeError(f"webhook body must be bytes or str, got {type(body).__name__}") + + +def _normalize(value: Optional[str]) -> str: + if value is None: + return "" + return value.strip().lower() + + +def decompress_webhook_body( + body: Union[bytes, str], + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, +) -> bytes: + """Decode a (possibly wrapped + compressed) webhook payload. + + Application order: + + 1. ``payload_encoding`` (``"base64"`` / ``"b64"``) is unwrapped first. + This corresponds to the SQS / SNS envelope, which base64-wraps the + compressed bytes so they stay valid UTF-8 over the queue. + 2. ``content_encoding`` (``"gzip"``) is decompressed. + 3. The resulting raw JSON bytes are returned. The caller can decode them + as UTF-8 or pass them straight to :func:`json.loads` (which accepts + bytes). + + ``None`` or an empty string for either encoding is a no-op, so the regular + HTTP webhook path (no compression, no wrapping) is just an identity + function and stays bytewise identical to today. + + :param body: raw bytes or str received from Stream + :param content_encoding: value of the ``Content-Encoding`` header (``"gzip"``) + :param payload_encoding: wrapper around the compressed bytes + (``"base64"`` / ``"b64"``) + :returns: the uncompressed JSON body as bytes + :raises WebhookSignatureError: when the body cannot be decoded with the + requested encodings + :raises ValueError: when an encoding value is not supported by this SDK + """ + data = _to_bytes(body) + + payload_enc = _normalize(payload_encoding) + if payload_enc: + if payload_enc in _BASE64_ALIASES: + try: + data = base64.b64decode(data, validate=True) + except ValueError as exc: + raise WebhookSignatureError( + f"failed to decode webhook body with payload_encoding={payload_enc!r}: {exc}" + ) + else: + raise ValueError( + f"unsupported webhook payload_encoding: {payload_encoding}. " + "This SDK only supports base64." + ) + + content_enc = _normalize(content_encoding) + if content_enc: + if content_enc in _GZIP_ALIASES: + try: + data = gzip.decompress(data) + except (gzip.BadGzipFile, OSError, EOFError) as exc: + raise WebhookSignatureError( + f"failed to decompress webhook body with Content-Encoding={content_enc!r}: {exc}" + ) + else: + raise ValueError( + f"unsupported webhook Content-Encoding: {content_encoding}. " + "This SDK only supports gzip; set webhook_compression_algorithm " + 'to "gzip" on the app config.' + ) + + return data + + +def verify_and_decode_webhook( + body: Union[bytes, str], + x_signature: Union[str, bytes], + api_secret: str, + content_encoding: Optional[str] = None, + payload_encoding: Optional[str] = None, +) -> bytes: + """Decode a webhook payload and verify its HMAC-SHA256 signature. + + The signature is always computed over the **uncompressed** JSON bytes, + so this helper first applies :func:`decompress_webhook_body` and then + compares the digest with ``x_signature`` using :func:`hmac.compare_digest`. + + :returns: the verified, uncompressed JSON body as bytes + :raises WebhookSignatureError: on signature mismatch or any decode error + """ + decoded = decompress_webhook_body( + body, content_encoding=content_encoding, payload_encoding=payload_encoding + ) + + if isinstance(x_signature, bytes): + x_signature = x_signature.decode() + + expected = hmac.new( + key=api_secret.encode(), msg=decoded, digestmod=hashlib.sha256 + ).hexdigest() + + if not hmac.compare_digest(expected, x_signature): + raise WebhookSignatureError("invalid webhook signature") + + return decoded From 7b3522fdf04d0cd7da50074abf7ef71c673400e4 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Thu, 7 May 2026 12:37:23 +0200 Subject: [PATCH 02/15] fix(tests): satisfy isort import-block rule Co-authored-by: Cursor --- stream_chat/tests/test_webhook_compression.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index 6b28cb67..dd72df3b 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -7,10 +7,7 @@ from stream_chat import StreamChat, StreamChatAsync from stream_chat.base.exceptions import WebhookSignatureError -from stream_chat.webhook import ( - decompress_webhook_body, - verify_and_decode_webhook, -) +from stream_chat.webhook import decompress_webhook_body, verify_and_decode_webhook API_KEY = "tkey" API_SECRET = "tsec2" From 7402f180a8d6f39755e0a4b67eda35597e89ce95 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Fri, 8 May 2026 15:33:12 +0200 Subject: [PATCH 03/15] refactor(webhooks): switch to verify_and_parse_* API (CHA-3071) Replaces the earlier verify_and_decode_webhook surface with the cross-SDK contract documented at https://getstream.io/chat/docs/node/webhooks_overview/. Module-level helpers in stream_chat.webhook: Primitives: ungzip_payload - gzip magic-byte detection + inflate decode_sqs_payload - base64 then ungzip-if-magic decode_sns_payload - alias for decode_sqs_payload verify_signature - constant-time HMAC-SHA256 comparison parse_event - JSON -> dict (typed event lands later) Composite (return parsed event dict): verify_and_parse_webhook verify_and_parse_sqs verify_and_parse_sns The composite functions auto-detect compression from body bytes, so the same handler stays correct whether or not Stream is currently compressing payloads, and behind middleware that auto-decompresses. Client instance methods (StreamChat / StreamChatAsync) mirror the three composite helpers with api_secret pulled from the client. The legacy verify_webhook(body, x_signature) -> bool boolean helper is unchanged for backward compatibility. Co-authored-by: Cursor --- stream_chat/base/client.py | 122 +++--- stream_chat/tests/test_webhook_compression.py | 413 +++++++++--------- stream_chat/webhook.py | 252 ++++++----- 3 files changed, 406 insertions(+), 381 deletions(-) diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index 1230c65a..3b73d491 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -133,84 +133,66 @@ def verify_webhook( ).hexdigest() return signature == x_signature - def decompress_webhook_body( + def verify_and_parse_webhook( self, body: Union[bytes, str], - content_encoding: Optional[str] = None, - payload_encoding: Optional[str] = None, - ) -> bytes: - """Decode a (possibly compressed and/or wrapped) webhook payload. - - Stream Chat can compress outbound webhook payloads with gzip and, for - SQS / SNS firehose delivery, also wrap the compressed bytes in base64 - so they remain valid UTF-8 over the queue. This helper applies the - encodings in order: - - 1. ``payload_encoding`` (``"base64"`` / ``"b64"``) is unwrapped first. - 2. ``content_encoding`` (``"gzip"``) is decompressed next. - 3. The raw JSON bytes are returned. The caller can ``.decode("utf-8")`` - or pass the value straight to :func:`json.loads`, which accepts - bytes. - - ``None`` or an empty string for either encoding is a no-op, so the - regular HTTP webhook path stays bytewise identical to today. - - This method does **not** check the ``X-Signature`` header. Use - :meth:`verify_and_decode_webhook` for the combined decode + verify - flow. - - :param body: raw bytes (or str) received from Stream - :param content_encoding: value of the ``Content-Encoding`` header - (only ``"gzip"`` is supported) - :param payload_encoding: wrapper around the compressed bytes - (``"base64"`` / ``"b64"``); used by the SQS / SNS firehose - :returns: the uncompressed JSON body as bytes - """ - from stream_chat.webhook import decompress_webhook_body - - return decompress_webhook_body( - body, - content_encoding=content_encoding, - payload_encoding=payload_encoding, - ) + signature: Union[str, bytes], + ) -> Dict[str, Any]: + """Verify and parse an HTTP webhook event. + + Decompresses ``body`` when gzipped (detected from the body bytes), + verifies the ``X-Signature`` header against the app's API secret, + and returns the parsed event. The Python SDK currently returns a + ``dict``; typed event classes are planned for a future release. - def verify_and_decode_webhook( + :param body: raw HTTP request body bytes Stream signed + :param signature: ``X-Signature`` header value + :raises stream_chat.base.exceptions.WebhookSignatureError: on + signature mismatch or any decode error + """ + from stream_chat.webhook import verify_and_parse_webhook + + return verify_and_parse_webhook(body, signature, self.api_secret) + + def verify_and_parse_sqs( self, - body: Union[bytes, str], - x_signature: Union[str, bytes], - content_encoding: Optional[str] = None, - payload_encoding: Optional[str] = None, - ) -> bytes: - """Decode a webhook payload and verify its HMAC-SHA256 signature. - - The signature is always computed over the **uncompressed** JSON - payload, so this method first decodes the body via - :meth:`decompress_webhook_body` and then compares the digest with - ``x_signature`` using :func:`hmac.compare_digest`. - - Works for plain HTTP webhooks (pass the ``Content-Encoding`` header - value) and for SQS / SNS firehose envelopes (additionally pass - ``payload_encoding="base64"``). - - :param body: raw bytes (or str) received from Stream - :param x_signature: the ``X-Signature`` header value sent by Stream - :param content_encoding: value of the ``Content-Encoding`` header - (only ``"gzip"`` is supported) - :param payload_encoding: wrapper around the compressed bytes - (``"base64"`` / ``"b64"``); used by the SQS / SNS firehose - :returns: the verified, uncompressed JSON body as bytes + message_body: Union[bytes, str], + signature: Union[str, bytes], + ) -> Dict[str, Any]: + """Verify and parse an SQS firehose webhook event. + + Reverses the base64 (+ optional gzip) wrapping on the SQS + ``Body``, verifies the ``X-Signature`` message attribute against + the app's API secret, and returns the parsed event. + + :param message_body: SQS message ``Body`` (string) + :param signature: ``X-Signature`` message attribute value :raises stream_chat.base.exceptions.WebhookSignatureError: on signature mismatch or any decode error """ - from stream_chat.webhook import verify_and_decode_webhook + from stream_chat.webhook import verify_and_parse_sqs - return verify_and_decode_webhook( - body, - x_signature, - api_secret=self.api_secret, - content_encoding=content_encoding, - payload_encoding=payload_encoding, - ) + return verify_and_parse_sqs(message_body, signature, self.api_secret) + + def verify_and_parse_sns( + self, + message: Union[bytes, str], + signature: Union[str, bytes], + ) -> Dict[str, Any]: + """Verify and parse an SNS firehose webhook event. + + Reverses the base64 (+ optional gzip) wrapping on the SNS + ``Message``, verifies the ``X-Signature`` message attribute + against the app's API secret, and returns the parsed event. + + :param message: SNS notification ``Message`` field (string) + :param signature: ``X-Signature`` message attribute value + :raises stream_chat.base.exceptions.WebhookSignatureError: on + signature mismatch or any decode error + """ + from stream_chat.webhook import verify_and_parse_sns + + return verify_and_parse_sns(message, signature, self.api_secret) @abc.abstractmethod def update_app_settings( diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index dd72df3b..54c7883b 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -1,17 +1,52 @@ +"""Tests for the webhook verification + parsing helpers. + +The Python SDK exposes the cross-SDK webhook contract in two layers: + +* Module-level functions in :mod:`stream_chat.webhook`: + + * primitives - ``ungzip_payload``, ``decode_sqs_payload``, + ``decode_sns_payload``, ``verify_signature``, ``parse_event`` + * composite helpers - ``verify_and_parse_webhook``, + ``verify_and_parse_sqs``, ``verify_and_parse_sns`` + +* Client-instance forms on :class:`StreamChat` and + :class:`StreamChatAsync`. They take ``api_secret`` from the client and + delegate to the module functions. + +Tests below exercise each layer with both compressed and uncompressed +payloads and confirm that bad signatures, malformed gzip / base64, and +parsed-event return values all behave as documented. +""" + import base64 import gzip import hashlib import hmac +import json import pytest from stream_chat import StreamChat, StreamChatAsync from stream_chat.base.exceptions import WebhookSignatureError -from stream_chat.webhook import decompress_webhook_body, verify_and_decode_webhook +from stream_chat.webhook import ( + GZIP_MAGIC, + decode_sns_payload, + decode_sqs_payload, + parse_event, + ungzip_payload, + verify_and_parse_sns, + verify_and_parse_sqs, + verify_and_parse_webhook, + verify_signature, +) API_KEY = "tkey" API_SECRET = "tsec2" JSON_BODY = b'{"type":"message.new","message":{"text":"the quick brown fox"}}' +EVENT_DICT = { + "type": "message.new", + "message": {"text": "the quick brown fox"}, +} def _sign(body: bytes, secret: str = API_SECRET) -> str: @@ -22,8 +57,8 @@ def _gzip(body: bytes) -> bytes: return gzip.compress(body) -def _b64(body: bytes) -> bytes: - return base64.b64encode(body) +def _b64(body: bytes) -> str: + return base64.b64encode(body).decode("ascii") @pytest.fixture @@ -31,241 +66,221 @@ def sync_client() -> StreamChat: return StreamChat(api_key=API_KEY, api_secret=API_SECRET) -class TestVerifyWebhookBackwardCompat: - def test_verify_webhook_matches_signature(self, sync_client: StreamChat): - signature = _sign(JSON_BODY) - assert sync_client.verify_webhook(JSON_BODY, signature) is True +class TestUngzipPayload: + def test_passthrough_plain_bytes(self): + assert ungzip_payload(JSON_BODY) == JSON_BODY - def test_verify_webhook_rejects_bad_signature(self, sync_client: StreamChat): - assert sync_client.verify_webhook(JSON_BODY, "0" * 64) is False + def test_passthrough_str_input(self): + assert ungzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY - def test_verify_webhook_accepts_bytes_signature(self, sync_client: StreamChat): - signature = _sign(JSON_BODY).encode() - assert sync_client.verify_webhook(JSON_BODY, signature) is True + def test_inflates_gzip_bytes(self): + assert ungzip_payload(_gzip(JSON_BODY)) == JSON_BODY + def test_returns_bytes(self): + assert isinstance(ungzip_payload(JSON_BODY), bytes) + assert isinstance(ungzip_payload(_gzip(JSON_BODY)), bytes) -class TestDecompressWebhookBody: - def test_passthrough_when_no_encodings(self): - assert decompress_webhook_body(JSON_BODY) == JSON_BODY + def test_empty_input(self): + assert ungzip_payload(b"") == b"" - def test_passthrough_when_encodings_are_empty_strings(self): - assert ( - decompress_webhook_body(JSON_BODY, content_encoding="", payload_encoding="") - == JSON_BODY - ) + def test_short_input_below_magic_length(self): + assert ungzip_payload(b"ab") == b"ab" - def test_passthrough_when_encodings_are_none(self): - assert ( - decompress_webhook_body( - JSON_BODY, content_encoding=None, payload_encoding=None - ) - == JSON_BODY - ) + def test_truncated_gzip_with_magic_raises(self): + bad = GZIP_MAGIC + b"\x00\x00\x00" + with pytest.raises(WebhookSignatureError) as exc_info: + ungzip_payload(bad) + assert "decompress" in str(exc_info.value).lower() - def test_gzip_round_trip_bytes(self): - compressed = _gzip(JSON_BODY) - assert decompress_webhook_body(compressed, content_encoding="gzip") == JSON_BODY - def test_gzip_round_trip_str_input(self): - compressed = _gzip(JSON_BODY) - wrapped = compressed.decode("latin-1") - assert ( - decompress_webhook_body(wrapped.encode("latin-1"), content_encoding="gzip") - == JSON_BODY - ) +class TestDecodeSqsPayload: + def test_base64_only_no_compression(self): + assert decode_sqs_payload(_b64(JSON_BODY)) == JSON_BODY - def test_base64_round_trip_no_compression(self): - wrapped = _b64(JSON_BODY) - assert decompress_webhook_body(wrapped, payload_encoding="base64") == JSON_BODY + def test_base64_plus_gzip(self): + assert decode_sqs_payload(_b64(_gzip(JSON_BODY))) == JSON_BODY + + def test_accepts_str_input(self): + encoded = _b64(_gzip(JSON_BODY)) + assert isinstance(encoded, str) + assert decode_sqs_payload(encoded) == JSON_BODY + + def test_accepts_bytes_input(self): + encoded = _b64(_gzip(JSON_BODY)).encode("ascii") + assert decode_sqs_payload(encoded) == JSON_BODY + + def test_invalid_base64_raises(self): + with pytest.raises(WebhookSignatureError) as exc_info: + decode_sqs_payload("!!!not-valid-base64!!!") + assert "base64" in str(exc_info.value).lower() - def test_base64_str_input(self): - wrapped_str = _b64(JSON_BODY).decode("ascii") - assert ( - decompress_webhook_body(wrapped_str, payload_encoding="base64") == JSON_BODY - ) - def test_base64_plus_gzip_round_trip(self): +class TestDecodeSnsPayload: + def test_aliases_decode_sqs_payload(self): wrapped = _b64(_gzip(JSON_BODY)) - assert ( - decompress_webhook_body( - wrapped, content_encoding="gzip", payload_encoding="base64" - ) - == JSON_BODY - ) + assert decode_sns_payload(wrapped) == decode_sqs_payload(wrapped) - @pytest.mark.parametrize( - "content_encoding", - ["GZIP", "Gzip", " gzip ", "gZiP"], - ) - def test_content_encoding_is_case_insensitive(self, content_encoding: str): - compressed = _gzip(JSON_BODY) - assert ( - decompress_webhook_body(compressed, content_encoding=content_encoding) - == JSON_BODY - ) + def test_round_trip(self): + assert decode_sns_payload(_b64(_gzip(JSON_BODY))) == JSON_BODY - @pytest.mark.parametrize( - "payload_encoding", - ["BASE64", "Base64", " base64 ", "B64", "b64", " b64 "], - ) - def test_payload_encoding_aliases_and_case(self, payload_encoding: str): - wrapped = _b64(JSON_BODY) - assert ( - decompress_webhook_body(wrapped, payload_encoding=payload_encoding) - == JSON_BODY - ) - @pytest.mark.parametrize( - "content_encoding", ["br", "brotli", "zstd", "deflate", "compress", "lz4"] - ) - def test_unsupported_content_encoding(self, content_encoding: str): - with pytest.raises(ValueError) as exc_info: - decompress_webhook_body(JSON_BODY, content_encoding=content_encoding) - message = str(exc_info.value).lower() - assert "unsupported" in message - assert "gzip" in message - - @pytest.mark.parametrize("payload_encoding", ["hex", "url", "binary"]) - def test_unsupported_payload_encoding(self, payload_encoding: str): - with pytest.raises(ValueError) as exc_info: - decompress_webhook_body(JSON_BODY, payload_encoding=payload_encoding) - message = str(exc_info.value).lower() - assert "unsupported" in message - assert "payload_encoding" in message - - def test_invalid_gzip_bytes_raises(self): - with pytest.raises(WebhookSignatureError) as exc_info: - decompress_webhook_body(b"this is not gzip data", content_encoding="gzip") - assert "decompress" in str(exc_info.value).lower() +class TestVerifySignature: + def test_matching(self): + assert verify_signature(JSON_BODY, _sign(JSON_BODY), API_SECRET) is True - def test_invalid_base64_input_raises(self): - with pytest.raises(WebhookSignatureError) as exc_info: - decompress_webhook_body( - b"!!!not-valid-base64!!!", payload_encoding="base64" - ) - assert "payload_encoding" in str(exc_info.value).lower() - - def test_returns_bytes_type(self): - result = decompress_webhook_body(JSON_BODY) - assert isinstance(result, bytes) - - def test_unsupported_message_includes_value(self): - with pytest.raises(ValueError) as exc_info: - decompress_webhook_body(JSON_BODY, content_encoding="brotli") - assert "brotli" in str(exc_info.value) - - -class TestVerifyAndDecodeWebhookHelper: - def test_happy_path_plain(self): - signature = _sign(JSON_BODY) - assert ( - verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) - == JSON_BODY - ) + def test_mismatched_returns_false(self): + assert verify_signature(JSON_BODY, "0" * 64, API_SECRET) is False + + def test_accepts_bytes_signature(self): + sig = _sign(JSON_BODY).encode() + assert verify_signature(JSON_BODY, sig, API_SECRET) is True - def test_happy_path_gzip(self): + def test_accepts_str_body(self): + body_str = JSON_BODY.decode("utf-8") + assert verify_signature(body_str, _sign(JSON_BODY), API_SECRET) is True + + def test_wrong_secret_returns_false(self): + sig = _sign(JSON_BODY, secret="other") + assert verify_signature(JSON_BODY, sig, API_SECRET) is False + + def test_signature_must_match_uncompressed_bytes(self): compressed = _gzip(JSON_BODY) - signature = _sign(JSON_BODY) - assert ( - verify_and_decode_webhook( - compressed, - signature, - api_secret=API_SECRET, - content_encoding="gzip", - ) - == JSON_BODY - ) + sig_over_compressed = _sign(compressed) + assert verify_signature(JSON_BODY, sig_over_compressed, API_SECRET) is False - def test_happy_path_base64_plus_gzip(self): - wrapped = _b64(_gzip(JSON_BODY)) - signature = _sign(JSON_BODY) - assert ( - verify_and_decode_webhook( - wrapped, - signature, - api_secret=API_SECRET, - content_encoding="gzip", - payload_encoding="base64", - ) - == JSON_BODY - ) + +class TestParseEvent: + def test_parses_bytes(self): + assert parse_event(JSON_BODY) == EVENT_DICT + + def test_parses_str(self): + assert parse_event(JSON_BODY.decode("utf-8")) == EVENT_DICT + + def test_unknown_event_type_still_parses(self): + body = b'{"type":"a.future.event","custom":42}' + assert parse_event(body) == {"type": "a.future.event", "custom": 42} + + def test_malformed_json_raises(self): + with pytest.raises(json.JSONDecodeError): + parse_event(b"not json") + + +class TestVerifyAndParseWebhook: + def test_plain_body(self): + sig = _sign(JSON_BODY) + assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT + + def test_gzip_body(self): + sig = _sign(JSON_BODY) + assert verify_and_parse_webhook(_gzip(JSON_BODY), sig, API_SECRET) == EVENT_DICT + + def test_returns_dict(self): + sig = _sign(JSON_BODY) + result = verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) + assert isinstance(result, dict) def test_signature_mismatch_raises(self): with pytest.raises(WebhookSignatureError) as exc_info: - verify_and_decode_webhook(JSON_BODY, "0" * 64, api_secret=API_SECRET) + verify_and_parse_webhook(JSON_BODY, "0" * 64, API_SECRET) assert "invalid webhook signature" in str(exc_info.value).lower() - def test_signature_over_compressed_bytes_raises(self): + def test_signature_must_be_over_uncompressed_bytes(self): compressed = _gzip(JSON_BODY) - signature_over_compressed = _sign(compressed) + sig_over_compressed = _sign(compressed) + with pytest.raises(WebhookSignatureError): + verify_and_parse_webhook(compressed, sig_over_compressed, API_SECRET) + + def test_wrong_secret_raises(self): + sig = _sign(JSON_BODY, secret="other") with pytest.raises(WebhookSignatureError): - verify_and_decode_webhook( - compressed, - signature_over_compressed, - api_secret=API_SECRET, - content_encoding="gzip", - ) - - def test_signature_over_wrapped_bytes_raises(self): + verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) + + def test_signature_can_be_bytes(self): + sig = _sign(JSON_BODY).encode() + assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT + + +class TestVerifyAndParseSqs: + def test_base64_only(self): + wrapped = _b64(JSON_BODY) + sig = _sign(JSON_BODY) + assert verify_and_parse_sqs(wrapped, sig, API_SECRET) == EVENT_DICT + + def test_base64_plus_gzip(self): + wrapped = _b64(_gzip(JSON_BODY)) + sig = _sign(JSON_BODY) + assert verify_and_parse_sqs(wrapped, sig, API_SECRET) == EVENT_DICT + + def test_signature_mismatch_raises(self): wrapped = _b64(_gzip(JSON_BODY)) - signature_over_wrapped = _sign(wrapped) with pytest.raises(WebhookSignatureError): - verify_and_decode_webhook( - wrapped, - signature_over_wrapped, - api_secret=API_SECRET, - content_encoding="gzip", - payload_encoding="base64", - ) - - def test_bad_secret_raises(self): - signature = _sign(JSON_BODY, secret="other") + verify_and_parse_sqs(wrapped, "0" * 64, API_SECRET) + + def test_signature_over_compressed_or_wrapped_bytes_rejected(self): + wrapped = _b64(_gzip(JSON_BODY)) + sig_over_wrapped = _sign(wrapped.encode("ascii")) with pytest.raises(WebhookSignatureError): - verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) + verify_and_parse_sqs(wrapped, sig_over_wrapped, API_SECRET) - def test_signature_can_be_bytes(self): - signature = _sign(JSON_BODY).encode() - assert ( - verify_and_decode_webhook(JSON_BODY, signature, api_secret=API_SECRET) - == JSON_BODY + +class TestVerifyAndParseSns: + def test_round_trip(self): + wrapped = _b64(_gzip(JSON_BODY)) + sig = _sign(JSON_BODY) + assert verify_and_parse_sns(wrapped, sig, API_SECRET) == EVENT_DICT + + def test_matches_sqs_behaviour(self): + wrapped = _b64(_gzip(JSON_BODY)) + sig = _sign(JSON_BODY) + assert verify_and_parse_sns(wrapped, sig, API_SECRET) == verify_and_parse_sqs( + wrapped, sig, API_SECRET ) class TestSyncClientMethods: - def test_decompress_via_client(self, sync_client: StreamChat): + def test_verify_and_parse_webhook(self, sync_client: StreamChat): + sig = _sign(JSON_BODY) + assert sync_client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + + def test_verify_and_parse_sqs(self, sync_client: StreamChat): wrapped = _b64(_gzip(JSON_BODY)) - assert ( - sync_client.decompress_webhook_body( - wrapped, content_encoding="gzip", payload_encoding="base64" - ) - == JSON_BODY - ) + sig = _sign(JSON_BODY) + assert sync_client.verify_and_parse_sqs(wrapped, sig) == EVENT_DICT - def test_verify_and_decode_via_client(self, sync_client: StreamChat): - signature = _sign(JSON_BODY) - compressed = _gzip(JSON_BODY) - assert ( - sync_client.verify_and_decode_webhook( - compressed, signature, content_encoding="gzip" - ) - == JSON_BODY - ) + def test_verify_and_parse_sns(self, sync_client: StreamChat): + wrapped = _b64(_gzip(JSON_BODY)) + sig = _sign(JSON_BODY) + assert sync_client.verify_and_parse_sns(wrapped, sig) == EVENT_DICT - def test_verify_and_decode_via_client_signature_mismatch( - self, sync_client: StreamChat - ): + def test_signature_mismatch_via_client(self, sync_client: StreamChat): with pytest.raises(WebhookSignatureError): - sync_client.verify_and_decode_webhook(JSON_BODY, "0" * 64) + sync_client.verify_and_parse_webhook(JSON_BODY, "0" * 64) + + +class TestSyncClientLegacyVerifyWebhook: + """The legacy boolean helper stays unchanged for backward compatibility.""" + + def test_returns_true_on_match(self, sync_client: StreamChat): + assert sync_client.verify_webhook(JSON_BODY, _sign(JSON_BODY)) is True + + def test_returns_false_on_mismatch(self, sync_client: StreamChat): + assert sync_client.verify_webhook(JSON_BODY, "0" * 64) is False class TestAsyncClientMethods: - async def test_async_verify_and_decode_happy_path(self): - signature = _sign(JSON_BODY) - compressed = _gzip(JSON_BODY) + async def test_verify_and_parse_webhook(self): + sig = _sign(JSON_BODY) + async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: + assert client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + + async def test_verify_and_parse_sqs(self): + wrapped = _b64(_gzip(JSON_BODY)) + sig = _sign(JSON_BODY) + async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: + assert client.verify_and_parse_sqs(wrapped, sig) == EVENT_DICT + + async def test_verify_and_parse_sns(self): + wrapped = _b64(_gzip(JSON_BODY)) + sig = _sign(JSON_BODY) async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: - assert ( - client.verify_and_decode_webhook( - compressed, signature, content_encoding="gzip" - ) - == JSON_BODY - ) + assert client.verify_and_parse_sns(wrapped, sig) == EVENT_DICT diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index 941c32bf..785c0aaa 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -1,30 +1,35 @@ -"""Helpers for verifying and decoding outbound Stream webhook payloads. - -Stream Chat can compress outbound webhook payloads with gzip and, for SQS / SNS -firehose delivery, also wrap the compressed bytes in base64 so they remain -valid UTF-8 over the queue. The helpers in this module mirror the cross-SDK -contract: callers can either decode the body without checking the signature -(:func:`decompress_webhook_body`) or do decode + HMAC verification in one call -(:func:`verify_and_decode_webhook`). - -The functions live outside the client classes so they can be exercised in -isolation, without instantiating an HTTP client. The client methods just -delegate here, passing ``self.api_secret``. +"""Webhook verification and parsing helpers. + +Stream Chat can deliver outbound events as plain JSON, gzipped JSON over +HTTP, or as base64 + gzip wrapped messages over SQS / SNS. The helpers in +this module implement the cross-SDK contract documented at +https://getstream.io/chat/docs/node/webhooks_overview/. + +The composite functions (:func:`verify_and_parse_webhook`, +:func:`verify_and_parse_sqs`, :func:`verify_and_parse_sns`) are the +recommended entry points. The primitives they compose are exposed so +callers can build custom flows or run individual steps in isolation. + +The Python SDK currently returns the parsed JSON as a ``dict``; typed +event classes will land in a future release. """ import base64 +import binascii import gzip import hashlib import hmac -from typing import Optional, Union +import json +from typing import Any, Dict, Union from stream_chat.base.exceptions import WebhookSignatureError -_BASE64_ALIASES = frozenset({"base64", "b64"}) -_GZIP_ALIASES = frozenset({"gzip"}) +GZIP_MAGIC = b"\x1f\x8b\x08" +_BytesLike = Union[bytes, bytearray, memoryview, str] -def _to_bytes(body: Union[bytes, str]) -> bytes: + +def _to_bytes(body: _BytesLike) -> bytes: if isinstance(body, str): return body.encode("utf-8") if isinstance(body, (bytes, bytearray, memoryview)): @@ -32,106 +37,129 @@ def _to_bytes(body: Union[bytes, str]) -> bytes: raise TypeError(f"webhook body must be bytes or str, got {type(body).__name__}") -def _normalize(value: Optional[str]) -> str: - if value is None: - return "" - return value.strip().lower() - - -def decompress_webhook_body( - body: Union[bytes, str], - content_encoding: Optional[str] = None, - payload_encoding: Optional[str] = None, -) -> bytes: - """Decode a (possibly wrapped + compressed) webhook payload. - - Application order: - - 1. ``payload_encoding`` (``"base64"`` / ``"b64"``) is unwrapped first. - This corresponds to the SQS / SNS envelope, which base64-wraps the - compressed bytes so they stay valid UTF-8 over the queue. - 2. ``content_encoding`` (``"gzip"``) is decompressed. - 3. The resulting raw JSON bytes are returned. The caller can decode them - as UTF-8 or pass them straight to :func:`json.loads` (which accepts - bytes). - - ``None`` or an empty string for either encoding is a no-op, so the regular - HTTP webhook path (no compression, no wrapping) is just an identity - function and stays bytewise identical to today. - - :param body: raw bytes or str received from Stream - :param content_encoding: value of the ``Content-Encoding`` header (``"gzip"``) - :param payload_encoding: wrapper around the compressed bytes - (``"base64"`` / ``"b64"``) - :returns: the uncompressed JSON body as bytes - :raises WebhookSignatureError: when the body cannot be decoded with the - requested encodings - :raises ValueError: when an encoding value is not supported by this SDK +def ungzip_payload(body: _BytesLike) -> bytes: + """Return ``body`` unchanged unless it starts with the gzip magic + (``1f 8b 08``), in which case the gzip stream is decompressed. + + Magic-byte detection (rather than relying on a header) means the same + handler stays correct when middleware - Rails, Django, Laravel, Phoenix - + auto-decompresses the request before your code sees it. """ - data = _to_bytes(body) - - payload_enc = _normalize(payload_encoding) - if payload_enc: - if payload_enc in _BASE64_ALIASES: - try: - data = base64.b64decode(data, validate=True) - except ValueError as exc: - raise WebhookSignatureError( - f"failed to decode webhook body with payload_encoding={payload_enc!r}: {exc}" - ) - else: - raise ValueError( - f"unsupported webhook payload_encoding: {payload_encoding}. " - "This SDK only supports base64." - ) - - content_enc = _normalize(content_encoding) - if content_enc: - if content_enc in _GZIP_ALIASES: - try: - data = gzip.decompress(data) - except (gzip.BadGzipFile, OSError, EOFError) as exc: - raise WebhookSignatureError( - f"failed to decompress webhook body with Content-Encoding={content_enc!r}: {exc}" - ) - else: - raise ValueError( - f"unsupported webhook Content-Encoding: {content_encoding}. " - "This SDK only supports gzip; set webhook_compression_algorithm " - 'to "gzip" on the app config.' - ) - - return data - - -def verify_and_decode_webhook( - body: Union[bytes, str], - x_signature: Union[str, bytes], - api_secret: str, - content_encoding: Optional[str] = None, - payload_encoding: Optional[str] = None, -) -> bytes: - """Decode a webhook payload and verify its HMAC-SHA256 signature. - - The signature is always computed over the **uncompressed** JSON bytes, - so this helper first applies :func:`decompress_webhook_body` and then - compares the digest with ``x_signature`` using :func:`hmac.compare_digest`. - - :returns: the verified, uncompressed JSON body as bytes - :raises WebhookSignatureError: on signature mismatch or any decode error + raw = _to_bytes(body) + if raw[:3] != GZIP_MAGIC: + return raw + try: + return gzip.decompress(raw) + except (gzip.BadGzipFile, OSError, EOFError) as exc: + raise WebhookSignatureError(f"failed to decompress gzip payload: {exc}") + + +def decode_sqs_payload(body: _BytesLike) -> bytes: + """Reverse the SQS firehose envelope. + + SQS message bodies are always base64-encoded so they remain valid + UTF-8 over the queue. The base64-decoded bytes are gzip-decompressed + when they begin with the gzip magic, otherwise they are returned + as-is, which means the same call works whether or not Stream is + compressing payloads for this app. """ - decoded = decompress_webhook_body( - body, content_encoding=content_encoding, payload_encoding=payload_encoding - ) - - if isinstance(x_signature, bytes): - x_signature = x_signature.decode() - + raw = _to_bytes(body) + try: + decoded = base64.b64decode(raw, validate=True) + except (binascii.Error, ValueError) as exc: + raise WebhookSignatureError(f"failed to base64-decode payload: {exc}") + return ungzip_payload(decoded) + + +def decode_sns_payload(message: _BytesLike) -> bytes: + """Reverse the SNS firehose envelope. Byte-for-byte identical to + :func:`decode_sqs_payload`; exposed under both names so call sites + read intent.""" + return decode_sqs_payload(message) + + +def verify_signature( + body: _BytesLike, + signature: Union[str, bytes], + secret: str, +) -> bool: + """Constant-time HMAC-SHA256 verification of ``signature`` against + the digest of ``body`` keyed by ``secret``. + + The signature is always computed over the **uncompressed** JSON + bytes, so callers that decoded a gzipped or base64-wrapped payload + must pass the inflated bytes here. + """ + raw = _to_bytes(body) + if isinstance(signature, bytes): + signature = signature.decode("ascii") expected = hmac.new( - key=api_secret.encode(), msg=decoded, digestmod=hashlib.sha256 + key=secret.encode("utf-8"), msg=raw, digestmod=hashlib.sha256 ).hexdigest() + return hmac.compare_digest(expected, signature) + - if not hmac.compare_digest(expected, x_signature): +def parse_event(payload: _BytesLike) -> Dict[str, Any]: + """Parse a JSON-encoded webhook event. + + Returns a ``dict`` today; typed event classes are planned for a + future release of the Python SDK. The function name matches the + documented primitive so callers can swap in a typed parser later + without changing call sites. + """ + if isinstance(payload, (bytes, bytearray, memoryview)): + return json.loads(bytes(payload)) + return json.loads(payload) + + +def _verify_and_parse( + payload_bytes: bytes, + signature: Union[str, bytes], + secret: str, +) -> Dict[str, Any]: + if not verify_signature(payload_bytes, signature, secret): raise WebhookSignatureError("invalid webhook signature") + return parse_event(payload_bytes) + - return decoded +def verify_and_parse_webhook( + body: _BytesLike, + signature: Union[str, bytes], + secret: str, +) -> Dict[str, Any]: + """Decompress (when gzipped), verify the HMAC ``signature``, and + return the parsed event. + + :param body: raw HTTP request body bytes Stream signed + :param signature: ``X-Signature`` header value + :param secret: the app's API secret + :raises WebhookSignatureError: on signature mismatch or decode error + """ + inflated = ungzip_payload(body) + return _verify_and_parse(inflated, signature, secret) + + +def verify_and_parse_sqs( + message_body: _BytesLike, + signature: Union[str, bytes], + secret: str, +) -> Dict[str, Any]: + """Decode the SQS ``Body`` (base64, then gzip-if-magic), verify the + HMAC ``signature`` from the ``X-Signature`` message attribute, and + return the parsed event. + """ + inflated = decode_sqs_payload(message_body) + return _verify_and_parse(inflated, signature, secret) + + +def verify_and_parse_sns( + message: _BytesLike, + signature: Union[str, bytes], + secret: str, +) -> Dict[str, Any]: + """Decode the SNS ``Message`` (identical to SQS handling), verify + the HMAC ``signature`` from the ``X-Signature`` message attribute, + and return the parsed event. + """ + inflated = decode_sns_payload(message) + return _verify_and_parse(inflated, signature, secret) From 3f48c0e49881cae5cd2dcfe54b43bb30dd39a687 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Fri, 8 May 2026 16:24:41 +0200 Subject: [PATCH 04/15] fix(webhooks): drop redundant binascii.Error in except (B014) binascii.Error is a subclass of ValueError, so listing both in the except clause triggers flake8-bugbear B014. Catching ValueError alone covers both cases. Co-authored-by: Cursor --- stream_chat/webhook.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index 785c0aaa..cc4f98a3 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -15,7 +15,6 @@ """ import base64 -import binascii import gzip import hashlib import hmac @@ -66,7 +65,7 @@ def decode_sqs_payload(body: _BytesLike) -> bytes: raw = _to_bytes(body) try: decoded = base64.b64decode(raw, validate=True) - except (binascii.Error, ValueError) as exc: + except ValueError as exc: raise WebhookSignatureError(f"failed to base64-decode payload: {exc}") return ungzip_payload(decoded) From 97edc94c187a3bddc5d7efb6e4ef0568d30d37fc Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Fri, 8 May 2026 16:53:23 +0200 Subject: [PATCH 05/15] refactor(webhooks): use 2-byte gzip magic per RFC 1952 (CHA-3071) RFC 1952 defines the gzip magic number as the two-byte sequence 1F 8B; the third byte (CM) is informational and not part of the identifier. Trim the magic check from three bytes to two to match the spec and stay consistent with the reference implementations in the public docs. Co-authored-by: Cursor --- stream_chat/webhook.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index cc4f98a3..3e8c84df 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -23,7 +23,7 @@ from stream_chat.base.exceptions import WebhookSignatureError -GZIP_MAGIC = b"\x1f\x8b\x08" +GZIP_MAGIC = b"\x1f\x8b" _BytesLike = Union[bytes, bytearray, memoryview, str] @@ -38,14 +38,14 @@ def _to_bytes(body: _BytesLike) -> bytes: def ungzip_payload(body: _BytesLike) -> bytes: """Return ``body`` unchanged unless it starts with the gzip magic - (``1f 8b 08``), in which case the gzip stream is decompressed. + (``1f 8b``, per RFC 1952), in which case the gzip stream is decompressed. Magic-byte detection (rather than relying on a header) means the same handler stays correct when middleware - Rails, Django, Laravel, Phoenix - auto-decompresses the request before your code sees it. """ raw = _to_bytes(body) - if raw[:3] != GZIP_MAGIC: + if raw[:2] != GZIP_MAGIC: return raw try: return gzip.decompress(raw) From 4a3c771b00c15bd7a8f7875f564782536c8850ed Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Mon, 11 May 2026 11:16:31 +0200 Subject: [PATCH 06/15] fix(webhooks): make verify_signature robust against malformed signatures Previously, passing a signature with non-ASCII bytes (e.g. b"\xff..."), a non-ASCII unicode string, or a non-string type would raise UnicodeDecodeError / TypeError from inside verify_signature, leaking through verify_and_parse_webhook / _sqs / _sns and breaking the documented contract that says malformed inputs must surface as WebhookSignatureError. The boolean primitive now returns False for those inputs (an invalid-format signature can by definition never match), so the composite helpers raise WebhookSignatureError("invalid webhook signature") as expected. The constant-time HMAC comparison path is unchanged for well-formed inputs. Adds regression tests for non-ASCII bytes, non-ASCII str, and non-string signature inputs at both the primitive and composite layers. Co-authored-by: Cursor --- stream_chat/tests/test_webhook_compression.py | 15 +++++++++++++++ stream_chat/webhook.py | 16 ++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index 54c7883b..f35666c0 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -148,6 +148,15 @@ def test_signature_must_match_uncompressed_bytes(self): sig_over_compressed = _sign(compressed) assert verify_signature(JSON_BODY, sig_over_compressed, API_SECRET) is False + def test_non_ascii_bytes_signature_returns_false(self): + assert verify_signature(JSON_BODY, b"\xff" * 32, API_SECRET) is False + + def test_non_ascii_str_signature_returns_false(self): + assert verify_signature(JSON_BODY, "\u2603" * 64, API_SECRET) is False + + def test_non_string_signature_returns_false(self): + assert verify_signature(JSON_BODY, 12345, API_SECRET) is False # type: ignore[arg-type] + class TestParseEvent: def test_parses_bytes(self): @@ -199,6 +208,12 @@ def test_signature_can_be_bytes(self): sig = _sign(JSON_BODY).encode() assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT + def test_malformed_signature_surfaces_as_webhook_error(self): + with pytest.raises(WebhookSignatureError): + verify_and_parse_webhook(JSON_BODY, b"\xff" * 32, API_SECRET) + with pytest.raises(WebhookSignatureError): + verify_and_parse_webhook(JSON_BODY, "\u2603" * 64, API_SECRET) + class TestVerifyAndParseSqs: def test_base64_only(self): diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index 3e8c84df..e77c4b64 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -88,14 +88,26 @@ def verify_signature( The signature is always computed over the **uncompressed** JSON bytes, so callers that decoded a gzipped or base64-wrapped payload must pass the inflated bytes here. + + A malformed ``signature`` (non-ASCII bytes, non-string types, etc.) + is treated as a mismatch and returns ``False`` rather than raising, + so callers can rely on the boolean contract. """ raw = _to_bytes(body) if isinstance(signature, bytes): - signature = signature.decode("ascii") + try: + signature = signature.decode("ascii") + except UnicodeDecodeError: + return False + elif not isinstance(signature, str): + return False expected = hmac.new( key=secret.encode("utf-8"), msg=raw, digestmod=hashlib.sha256 ).hexdigest() - return hmac.compare_digest(expected, signature) + try: + return hmac.compare_digest(expected, signature) + except TypeError: + return False def parse_event(payload: _BytesLike) -> Dict[str, Any]: From 03849dfa0e8e718fd07d9ec4e295d56c290b48a0 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Mon, 11 May 2026 11:17:27 +0200 Subject: [PATCH 07/15] docs(webhooks): align compression section with the shipped API The previous draft referenced helpers that were renamed during the refactor to the verify_and_parse_* contract (CHA-3071): - client.verify_and_decode_webhook(...) -> verify_and_parse_webhook - decompress_webhook_body(...) -> removed (no public form) - content_encoding / payload_encoding -> removed (magic-byte detect) Following the old snippets would hit AttributeError immediately. The section is rewritten to document the real surface area: - client.verify_and_parse_webhook(body, signature) - client.verify_and_parse_sqs(message_body, signature) - client.verify_and_parse_sns(message, signature) - module-level webhook.verify_and_parse_* helpers for stateless use - WebhookSignatureError as the single error class It also clarifies the return type (parsed dict, not raw bytes) and notes that the legacy verify_webhook bool helper stays unchanged. Co-authored-by: Cursor --- .../webhooks_overview/webhooks_overview.md | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index 58313f08..2d12a2bd 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -92,9 +92,9 @@ valid = client.verify_webhook(request.data, request.headers['X-SIGNATURE']) ### Compressed webhook bodies -GZIP compression can be enabled for hooks payloads from the Dashboard. Enabling compression reduces the payload size significantly (often 70–90% smaller) reducing your bandwidth usage on Stream. The computation overhead introduced by the decompression step is usually negligible and offset by the much smaller payload. +GZIP compression can be enabled for hook payloads from the Dashboard. Enabling compression reduces the payload size significantly (often 70–90% smaller) reducing your bandwidth usage on Stream. The decompression cost on your side is usually negligible and offset by the much smaller payload. -When payload compression is enabled, webhook HTTP requests will include the `Content-Encoding: gzip` header and the request body will be compressed with GZIP. Some HTTP servers and middleware (Rails, Django, Laravel, Spring Boot, ASP.NET) handle this transparently and strip the header before your handler runs — in that case the body you see is already raw JSON. +When payload compression is enabled, webhook HTTP requests include the `Content-Encoding: gzip` header and the body is gzipped. SQS and SNS messages are gzipped and then base64-wrapped (both transports are UTF-8 only). Some HTTP servers and middleware (Rails, Django, Laravel, Spring Boot, ASP.NET) auto-decompress the body before your handler runs — in that case the body you see is already raw JSON. Before enabling compression, make sure that: @@ -102,27 +102,23 @@ Before enabling compression, make sure that: * If you don't use an official SDK, make sure that your code supports receiving compressed payloads * The payload signature check is done on the **uncompressed** payload -The Python SDK ships with `client.verify_and_decode_webhook(...)` which transparently handles plain, gzip-compressed, and base64-wrapped (SQS / SNS firehose) payloads. It returns the raw JSON body as `bytes`, ready to pass to `json.loads`. +The Python SDK exposes a one-liner per transport. Each helper detects the encoding from the body bytes (the gzip magic `1f 8b`, per [RFC 1952](https://datatracker.ietf.org/doc/html/rfc1952)), verifies the HMAC `X-Signature` over the uncompressed JSON, and returns the parsed event as a `dict`. Typed event classes are planned for a future release; until then handlers can key off the `type` field. ```python -import json from stream_chat import StreamChat client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") # Django view def stream_webhook(request): - body = client.verify_and_decode_webhook( + event = client.verify_and_parse_webhook( request.body, request.headers["X-Signature"], - request.headers.get("Content-Encoding"), ) - event = json.loads(body) - # ... handle event ... + # ... handle event["type"], event["message"], ... ``` ```python -import json from flask import request from stream_chat import StreamChat @@ -130,38 +126,48 @@ client = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET") @app.route("/webhooks/stream", methods=["POST"]) def stream_webhook(): - body = client.verify_and_decode_webhook( + event = client.verify_and_parse_webhook( request.get_data(), request.headers["X-Signature"], - request.headers.get("Content-Encoding"), ) - event = json.loads(body) - # ... handle event ... + # ... handle event["type"], event["message"], ... ``` -If your HTTP framework or a middleware already decompressed the body before it reached your handler, the `Content-Encoding` header will be missing (or set to `identity`) and `verify_and_decode_webhook` will be a no-op for the decompression step — the same call works in both cases. +The same call works whether or not Stream is compressing for this app, and whether or not your framework auto-decompressed the request — the helper inspects the body bytes rather than the `Content-Encoding` header. -`verify_and_decode_webhook` raises `stream_chat.base.exceptions.WebhookSignatureError` when the signature does not match or the body cannot be decoded. +All helpers raise `stream_chat.base.exceptions.WebhookSignatureError` when the signature does not match, when the gzip stream is corrupt, or when the SQS/SNS base64 envelope cannot be decoded. -The original `client.verify_webhook(request.body, request.headers["X-Signature"])` is unchanged and still available for handlers that prefer to verify and parse the body separately. +The original `client.verify_webhook(request.body, request.headers["X-Signature"])` — which returns a `bool` and does not decompress — stays unchanged for backward compatibility. Switch to `verify_and_parse_webhook` to support compressed payloads. #### SQS / SNS firehose -When delivering events through SQS or SNS, Stream base64-wraps the (possibly gzip-compressed) body so the payload stays valid UTF-8 over the queue. Pass `payload_encoding="base64"` so `verify_and_decode_webhook` unwraps the envelope before verifying the HMAC signature, which is always computed over the uncompressed JSON. +For events delivered through SQS or SNS, call the matching helper on the message body. It base64-decodes the envelope, gzip-decompresses when the magic bytes are present, verifies the HMAC, and returns the parsed event. ```python -body = client.verify_and_decode_webhook( +event = client.verify_and_parse_sqs( sqs_message["Body"], sqs_message["MessageAttributes"]["X-Signature"]["StringValue"], - content_encoding=sqs_message["MessageAttributes"] - .get("Content-Encoding", {}) - .get("StringValue"), - payload_encoding="base64", ) -event = json.loads(body) + +event = client.verify_and_parse_sns( + sns_notification["Message"], + sns_notification["MessageAttributes"]["X-Signature"]["Value"], +) +``` + +#### Stateless / module-level form + +If you do not want to construct a `StreamChat` client (for example in a lightweight Lambda that only handles webhooks), call the module-level helpers directly. They take the API secret as a third argument and are otherwise identical: + +```python +from stream_chat import webhook + +event = webhook.verify_and_parse_webhook(body, signature, secret) +event = webhook.verify_and_parse_sqs(message_body, signature, secret) +event = webhook.verify_and_parse_sns(message, signature, secret) ``` -If you only need to decode the body without checking the signature (for example because you have already verified it elsewhere), use `client.decompress_webhook_body(body, content_encoding, payload_encoding)`. +The module also exposes the primitives the composites are built from — `ungzip_payload`, `decode_sqs_payload`, `decode_sns_payload`, `verify_signature` (constant-time HMAC-SHA256), and `parse_event` — for callers that need to run the steps individually. All webhook requests contain these headers: From 60775a5c652845da2de78a69673f49959a6db41a Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Mon, 11 May 2026 13:09:18 +0200 Subject: [PATCH 08/15] fix(webhooks): unwrap SNS notification envelope in decode_sns_payload decode_sns_payload now JSON-parses the SNS HTTP notification envelope ({"Type":"Notification","Message":"..."}) and extracts the inner Message field before running the SQS pipeline. Falls through to the pre-extracted Message string when the input is not a JSON envelope so existing call sites keep working. Test adds a realistic SNS HTTP notification body fixture and exercises both the new envelope path and the existing pre-extracted Message path. Docs updated to show the typical "pass the raw HTTP body" call site. Co-authored-by: Cursor --- .../webhooks_overview/webhooks_overview.md | 18 ++++-- stream_chat/tests/test_webhook_compression.py | 59 ++++++++++++++++--- stream_chat/webhook.py | 34 +++++++++-- 3 files changed, 94 insertions(+), 17 deletions(-) diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index 2d12a2bd..66bef797 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -141,17 +141,27 @@ The original `client.verify_webhook(request.body, request.headers["X-Signature"] #### SQS / SNS firehose -For events delivered through SQS or SNS, call the matching helper on the message body. It base64-decodes the envelope, gzip-decompresses when the magic bytes are present, verifies the HMAC, and returns the parsed event. +For events delivered through SQS or SNS, call the matching helper. It base64-decodes the envelope, gzip-decompresses when the magic bytes are present, verifies the HMAC, and returns the parsed event. + +For SQS, pass the message `Body` (already the payload): ```python event = client.verify_and_parse_sqs( sqs_message["Body"], sqs_message["MessageAttributes"]["X-Signature"]["StringValue"], ) +``` + +For SNS, pass the **raw notification body** (the full `{"Type":"Notification", ...}` JSON envelope Amazon delivers). The SDK extracts the inner `Message` field for you, so the call site mirrors what HTTP frameworks already hand you in `request.body`: + +```python +import json +# Django SNS HTTP delivery +attrs = json.loads(request.body)["MessageAttributes"] event = client.verify_and_parse_sns( - sns_notification["Message"], - sns_notification["MessageAttributes"]["X-Signature"]["Value"], + request.body, # raw envelope (bytes/str) + attrs["X-Signature"]["Value"], ) ``` @@ -164,7 +174,7 @@ from stream_chat import webhook event = webhook.verify_and_parse_webhook(body, signature, secret) event = webhook.verify_and_parse_sqs(message_body, signature, secret) -event = webhook.verify_and_parse_sns(message, signature, secret) +event = webhook.verify_and_parse_sns(notification_body, signature, secret) ``` The module also exposes the primitives the composites are built from — `ungzip_payload`, `decode_sqs_payload`, `decode_sns_payload`, `verify_signature` (constant-time HMAC-SHA256), and `parse_event` — for callers that need to run the steps individually. diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index f35666c0..ed9992f8 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -115,14 +115,40 @@ def test_invalid_base64_raises(self): assert "base64" in str(exc_info.value).lower() +def _sns_envelope(inner_message: str) -> str: + return json.dumps( + { + "Type": "Notification", + "MessageId": "22b80b92-fdea-4c2c-8f9d-bdfb0c7bf324", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:stream-webhooks", + "Message": inner_message, + "Timestamp": "2026-05-11T10:00:00.000Z", + "SignatureVersion": "1", + "MessageAttributes": { + "X-Signature": {"Type": "String", "Value": ""}, + }, + } + ) + + class TestDecodeSnsPayload: - def test_aliases_decode_sqs_payload(self): + def test_pre_extracted_message_matches_decode_sqs_payload(self): wrapped = _b64(_gzip(JSON_BODY)) assert decode_sns_payload(wrapped) == decode_sqs_payload(wrapped) - def test_round_trip(self): + def test_pre_extracted_message_round_trip(self): assert decode_sns_payload(_b64(_gzip(JSON_BODY))) == JSON_BODY + def test_unwraps_full_sns_envelope(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = _sns_envelope(wrapped) + assert decode_sns_payload(envelope) == JSON_BODY + + def test_handles_envelope_with_leading_whitespace(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = "\n " + _sns_envelope(wrapped) + assert decode_sns_payload(envelope) == JSON_BODY + class TestVerifySignature: def test_matching(self): @@ -181,7 +207,9 @@ def test_plain_body(self): def test_gzip_body(self): sig = _sign(JSON_BODY) - assert verify_and_parse_webhook(_gzip(JSON_BODY), sig, API_SECRET) == EVENT_DICT + assert ( + verify_and_parse_webhook(_gzip(JSON_BODY), sig, API_SECRET) == EVENT_DICT + ) def test_returns_dict(self): sig = _sign(JSON_BODY) @@ -239,23 +267,38 @@ def test_signature_over_compressed_or_wrapped_bytes_rejected(self): class TestVerifyAndParseSns: - def test_round_trip(self): + def test_pre_extracted_message_round_trip(self): wrapped = _b64(_gzip(JSON_BODY)) sig = _sign(JSON_BODY) assert verify_and_parse_sns(wrapped, sig, API_SECRET) == EVENT_DICT - def test_matches_sqs_behaviour(self): + def test_matches_sqs_behaviour_for_pre_extracted_message(self): wrapped = _b64(_gzip(JSON_BODY)) sig = _sign(JSON_BODY) assert verify_and_parse_sns(wrapped, sig, API_SECRET) == verify_and_parse_sqs( wrapped, sig, API_SECRET ) + def test_full_sns_envelope(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = _sns_envelope(wrapped) + sig = _sign(JSON_BODY) + assert verify_and_parse_sns(envelope, sig, API_SECRET) == EVENT_DICT + + def test_rejects_signature_over_envelope(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = _sns_envelope(wrapped) + sig_over_envelope = _sign(envelope.encode("utf-8")) + with pytest.raises(WebhookSignatureError): + verify_and_parse_sns(envelope, sig_over_envelope, API_SECRET) + class TestSyncClientMethods: def test_verify_and_parse_webhook(self, sync_client: StreamChat): sig = _sign(JSON_BODY) - assert sync_client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + assert ( + sync_client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + ) def test_verify_and_parse_sqs(self, sync_client: StreamChat): wrapped = _b64(_gzip(JSON_BODY)) @@ -286,7 +329,9 @@ class TestAsyncClientMethods: async def test_verify_and_parse_webhook(self): sig = _sign(JSON_BODY) async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: - assert client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + assert ( + client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT + ) async def test_verify_and_parse_sqs(self): wrapped = _b64(_gzip(JSON_BODY)) diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index e77c4b64..2ea775b7 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -19,7 +19,7 @@ import hashlib import hmac import json -from typing import Any, Dict, Union +from typing import Any, Dict, Optional, Union from stream_chat.base.exceptions import WebhookSignatureError @@ -70,11 +70,33 @@ def decode_sqs_payload(body: _BytesLike) -> bytes: return ungzip_payload(decoded) -def decode_sns_payload(message: _BytesLike) -> bytes: - """Reverse the SNS firehose envelope. Byte-for-byte identical to - :func:`decode_sqs_payload`; exposed under both names so call sites - read intent.""" - return decode_sqs_payload(message) +def decode_sns_payload(notification_body: _BytesLike) -> bytes: + """Reverse an SNS HTTP notification envelope. + + When ``notification_body`` is a JSON envelope + (``{"Type":"Notification","Message":"..."}``), the inner + ``Message`` field is extracted and run through + :func:`decode_sqs_payload` (base64-decode, then gzip-if-magic). When + the input is not a JSON envelope it is treated as the already-extracted + ``Message`` string, so call sites that pre-unwrap continue to work. + """ + raw = _to_bytes(notification_body) + inner = _extract_sns_message(raw) + return decode_sqs_payload(inner if inner is not None else raw) + + +def _extract_sns_message(notification_body: bytes) -> Optional[str]: + trimmed = notification_body.lstrip() + if not trimmed or trimmed[:1] != b"{": + return None + try: + envelope = json.loads(trimmed) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(envelope, dict): + return None + message = envelope.get("Message") + return message if isinstance(message, str) else None def verify_signature( From c59702ffcbbc2aa2ccfe86e083300e41c4904449 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Mon, 11 May 2026 13:12:42 +0200 Subject: [PATCH 09/15] style(webhooks): apply black formatting to SNS envelope test additions Co-authored-by: Cursor --- stream_chat/tests/test_webhook_compression.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index ed9992f8..68265067 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -207,9 +207,7 @@ def test_plain_body(self): def test_gzip_body(self): sig = _sign(JSON_BODY) - assert ( - verify_and_parse_webhook(_gzip(JSON_BODY), sig, API_SECRET) == EVENT_DICT - ) + assert verify_and_parse_webhook(_gzip(JSON_BODY), sig, API_SECRET) == EVENT_DICT def test_returns_dict(self): sig = _sign(JSON_BODY) @@ -296,9 +294,7 @@ def test_rejects_signature_over_envelope(self): class TestSyncClientMethods: def test_verify_and_parse_webhook(self, sync_client: StreamChat): sig = _sign(JSON_BODY) - assert ( - sync_client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT - ) + assert sync_client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT def test_verify_and_parse_sqs(self, sync_client: StreamChat): wrapped = _b64(_gzip(JSON_BODY)) @@ -329,9 +325,7 @@ class TestAsyncClientMethods: async def test_verify_and_parse_webhook(self): sig = _sign(JSON_BODY) async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: - assert ( - client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT - ) + assert client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT async def test_verify_and_parse_sqs(self): wrapped = _b64(_gzip(JSON_BODY)) From 69048d8a9ce9d1d885f8cb88f8c38da891a1e4ee Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Mon, 11 May 2026 15:30:16 +0200 Subject: [PATCH 10/15] refactor(webhooks): rename ungzip_payload to gunzip_payload + add golden fixtures (CHA-3071) Per Tommaso's suggestion, align the gzip helper with the GNU `gunzip` command name. The function was added in this PR and not yet released, so this is a straight rename with no back-compat alias. Adds Tommaso's reference fixtures to the test suite as named cases so future SDKs can sanity-check against the same payloads: aGVsbG93b3JsZA== -> helloworld (base64) H4sIAGrYAWoAA8tIzcnJL88vykkBAK0g6/kKAAAA -> helloworld (base64+gzip) Co-authored-by: Cursor --- .../webhooks_overview/webhooks_overview.md | 2 +- stream_chat/tests/test_webhook_compression.py | 35 +++++++++++++------ stream_chat/webhook.py | 6 ++-- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index 66bef797..bd199d44 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -177,7 +177,7 @@ event = webhook.verify_and_parse_sqs(message_body, signature, secret) event = webhook.verify_and_parse_sns(notification_body, signature, secret) ``` -The module also exposes the primitives the composites are built from — `ungzip_payload`, `decode_sqs_payload`, `decode_sns_payload`, `verify_signature` (constant-time HMAC-SHA256), and `parse_event` — for callers that need to run the steps individually. +The module also exposes the primitives the composites are built from — `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload`, `verify_signature` (constant-time HMAC-SHA256), and `parse_event` — for callers that need to run the steps individually. All webhook requests contain these headers: diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index 68265067..d34688e9 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -4,7 +4,7 @@ * Module-level functions in :mod:`stream_chat.webhook`: - * primitives - ``ungzip_payload``, ``decode_sqs_payload``, + * primitives - ``gunzip_payload``, ``decode_sqs_payload``, ``decode_sns_payload``, ``verify_signature``, ``parse_event`` * composite helpers - ``verify_and_parse_webhook``, ``verify_and_parse_sqs``, ``verify_and_parse_sns`` @@ -32,8 +32,8 @@ GZIP_MAGIC, decode_sns_payload, decode_sqs_payload, + gunzip_payload, parse_event, - ungzip_payload, verify_and_parse_sns, verify_and_parse_sqs, verify_and_parse_webhook, @@ -66,32 +66,36 @@ def sync_client() -> StreamChat: return StreamChat(api_key=API_KEY, api_secret=API_SECRET) -class TestUngzipPayload: +class TestGunzipPayload: def test_passthrough_plain_bytes(self): - assert ungzip_payload(JSON_BODY) == JSON_BODY + assert gunzip_payload(JSON_BODY) == JSON_BODY def test_passthrough_str_input(self): - assert ungzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY + assert gunzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY def test_inflates_gzip_bytes(self): - assert ungzip_payload(_gzip(JSON_BODY)) == JSON_BODY + assert gunzip_payload(_gzip(JSON_BODY)) == JSON_BODY def test_returns_bytes(self): - assert isinstance(ungzip_payload(JSON_BODY), bytes) - assert isinstance(ungzip_payload(_gzip(JSON_BODY)), bytes) + assert isinstance(gunzip_payload(JSON_BODY), bytes) + assert isinstance(gunzip_payload(_gzip(JSON_BODY)), bytes) def test_empty_input(self): - assert ungzip_payload(b"") == b"" + assert gunzip_payload(b"") == b"" def test_short_input_below_magic_length(self): - assert ungzip_payload(b"ab") == b"ab" + assert gunzip_payload(b"ab") == b"ab" def test_truncated_gzip_with_magic_raises(self): bad = GZIP_MAGIC + b"\x00\x00\x00" with pytest.raises(WebhookSignatureError) as exc_info: - ungzip_payload(bad) + gunzip_payload(bad) assert "decompress" in str(exc_info.value).lower() + def test_decompresses_helloworld_fixture(self): + gz_bytes = base64.b64decode("H4sIAGrYAWoAA8tIzcnJL88vykkBAK0g6/kKAAAA") + assert gunzip_payload(gz_bytes) == b"helloworld" + class TestDecodeSqsPayload: def test_base64_only_no_compression(self): @@ -114,6 +118,15 @@ def test_invalid_base64_raises(self): decode_sqs_payload("!!!not-valid-base64!!!") assert "base64" in str(exc_info.value).lower() + def test_decodes_helloworld_base64_fixture(self): + assert decode_sqs_payload("aGVsbG93b3JsZA==") == b"helloworld" + + def test_decodes_helloworld_base64_gzip_fixture(self): + assert ( + decode_sqs_payload("H4sIAGrYAWoAA8tIzcnJL88vykkBAK0g6/kKAAAA") + == b"helloworld" + ) + def _sns_envelope(inner_message: str) -> str: return json.dumps( diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index 2ea775b7..3f1dd7fc 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -36,7 +36,7 @@ def _to_bytes(body: _BytesLike) -> bytes: raise TypeError(f"webhook body must be bytes or str, got {type(body).__name__}") -def ungzip_payload(body: _BytesLike) -> bytes: +def gunzip_payload(body: _BytesLike) -> bytes: """Return ``body`` unchanged unless it starts with the gzip magic (``1f 8b``, per RFC 1952), in which case the gzip stream is decompressed. @@ -67,7 +67,7 @@ def decode_sqs_payload(body: _BytesLike) -> bytes: decoded = base64.b64decode(raw, validate=True) except ValueError as exc: raise WebhookSignatureError(f"failed to base64-decode payload: {exc}") - return ungzip_payload(decoded) + return gunzip_payload(decoded) def decode_sns_payload(notification_body: _BytesLike) -> bytes: @@ -168,7 +168,7 @@ def verify_and_parse_webhook( :param secret: the app's API secret :raises WebhookSignatureError: on signature mismatch or decode error """ - inflated = ungzip_payload(body) + inflated = gunzip_payload(body) return _verify_and_parse(inflated, signature, secret) From 7c7cbeb4ffc31a7bae158779a51a8d621d4b159c Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Tue, 12 May 2026 13:56:59 +0200 Subject: [PATCH 11/15] refactor(webhooks): unify webhook errors under InvalidWebhookError (CHA-3071) Per cross-SDK coordination (mogita's review on the 6 sibling SDK PRs), every webhook failure path now terminates at a single exception class. Customers only need one except arm and can filter by message text for mode-specific behaviour (signature mismatch vs invalid base64 etc.). Renames the previously-unreleased WebhookSignatureError to InvalidWebhookError and threads it through every primitive: verify_signature -> 'signature mismatch' gunzip_payload -> 'gzip decompression failed' decode_sqs_payload -> 'invalid base64 encoding' parse_event -> 'invalid JSON payload' StreamChat#verify_webhook (the legacy bool helper) is untouched. The message constants are exported so callers can exact-match if they prefer that over substring matching. Co-authored-by: Cursor --- .../webhooks_overview/webhooks_overview.md | 2 +- stream_chat/base/client.py | 6 +-- stream_chat/base/exceptions.py | 14 ------ stream_chat/tests/test_webhook_compression.py | 40 ++++++++++------- stream_chat/webhook.py | 44 ++++++++++++++----- 5 files changed, 61 insertions(+), 45 deletions(-) diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index bd199d44..340f6ffb 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -135,7 +135,7 @@ def stream_webhook(): The same call works whether or not Stream is compressing for this app, and whether or not your framework auto-decompressed the request — the helper inspects the body bytes rather than the `Content-Encoding` header. -All helpers raise `stream_chat.base.exceptions.WebhookSignatureError` when the signature does not match, when the gzip stream is corrupt, or when the SQS/SNS base64 envelope cannot be decoded. +All helpers raise `stream_chat.webhook.InvalidWebhookError` when the signature does not match, when the gzip stream is corrupt, or when the SQS/SNS base64 envelope cannot be decoded. The original `client.verify_webhook(request.body, request.headers["X-Signature"])` — which returns a `bool` and does not decompress — stays unchanged for backward compatibility. Switch to `verify_and_parse_webhook` to support compressed payloads. diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index 3b73d491..f2ab6b79 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -147,7 +147,7 @@ def verify_and_parse_webhook( :param body: raw HTTP request body bytes Stream signed :param signature: ``X-Signature`` header value - :raises stream_chat.base.exceptions.WebhookSignatureError: on + :raises stream_chat.webhook.InvalidWebhookError: on signature mismatch or any decode error """ from stream_chat.webhook import verify_and_parse_webhook @@ -167,7 +167,7 @@ def verify_and_parse_sqs( :param message_body: SQS message ``Body`` (string) :param signature: ``X-Signature`` message attribute value - :raises stream_chat.base.exceptions.WebhookSignatureError: on + :raises stream_chat.webhook.InvalidWebhookError: on signature mismatch or any decode error """ from stream_chat.webhook import verify_and_parse_sqs @@ -187,7 +187,7 @@ def verify_and_parse_sns( :param message: SNS notification ``Message`` field (string) :param signature: ``X-Signature`` message attribute value - :raises stream_chat.base.exceptions.WebhookSignatureError: on + :raises stream_chat.webhook.InvalidWebhookError: on signature mismatch or any decode error """ from stream_chat.webhook import verify_and_parse_sns diff --git a/stream_chat/base/exceptions.py b/stream_chat/base/exceptions.py index 3241f7f8..9ed295bb 100644 --- a/stream_chat/base/exceptions.py +++ b/stream_chat/base/exceptions.py @@ -25,17 +25,3 @@ def __str__(self) -> str: return f'StreamChat error code {self.error_code}: {self.error_message}"' else: return f"StreamChat error HTTP code: {self.status_code}" - - -class WebhookSignatureError(StreamAPIException): - """Raised when an outbound webhook signature does not match, the - webhook payload cannot be decompressed, or the wrapping (e.g. base64) - cannot be decoded. - """ - - def __init__(self, message: str) -> None: - super().__init__(message, status_code=0) - self.message = message - - def __str__(self) -> str: - return f"WebhookSignatureError: {self.message}" diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index d34688e9..8b27ec4d 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -27,9 +27,9 @@ import pytest from stream_chat import StreamChat, StreamChatAsync -from stream_chat.base.exceptions import WebhookSignatureError from stream_chat.webhook import ( GZIP_MAGIC, + InvalidWebhookError, decode_sns_payload, decode_sqs_payload, gunzip_payload, @@ -88,9 +88,13 @@ def test_short_input_below_magic_length(self): def test_truncated_gzip_with_magic_raises(self): bad = GZIP_MAGIC + b"\x00\x00\x00" - with pytest.raises(WebhookSignatureError) as exc_info: + with pytest.raises(InvalidWebhookError, match=r"gzip decompression failed"): gunzip_payload(bad) - assert "decompress" in str(exc_info.value).lower() + + def test_gunzip_payload_raises_on_corrupt_gzip(self): + corrupt = GZIP_MAGIC + b"\x08\x00" + b"\x00" * 20 + with pytest.raises(InvalidWebhookError, match=r"gzip decompression failed"): + gunzip_payload(corrupt) def test_decompresses_helloworld_fixture(self): gz_bytes = base64.b64decode("H4sIAGrYAWoAA8tIzcnJL88vykkBAK0g6/kKAAAA") @@ -114,9 +118,12 @@ def test_accepts_bytes_input(self): assert decode_sqs_payload(encoded) == JSON_BODY def test_invalid_base64_raises(self): - with pytest.raises(WebhookSignatureError) as exc_info: + with pytest.raises(InvalidWebhookError, match=r"invalid base64 encoding"): decode_sqs_payload("!!!not-valid-base64!!!") - assert "base64" in str(exc_info.value).lower() + + def test_decode_sqs_payload_raises_on_invalid_base64(self): + with pytest.raises(InvalidWebhookError, match=r"invalid base64 encoding"): + decode_sqs_payload("not*valid*base64*data") def test_decodes_helloworld_base64_fixture(self): assert decode_sqs_payload("aGVsbG93b3JsZA==") == b"helloworld" @@ -208,8 +215,8 @@ def test_unknown_event_type_still_parses(self): body = b'{"type":"a.future.event","custom":42}' assert parse_event(body) == {"type": "a.future.event", "custom": 42} - def test_malformed_json_raises(self): - with pytest.raises(json.JSONDecodeError): + def test_parse_event_raises_on_invalid_json(self): + with pytest.raises(InvalidWebhookError, match=r"invalid JSON payload"): parse_event(b"not json") @@ -228,19 +235,18 @@ def test_returns_dict(self): assert isinstance(result, dict) def test_signature_mismatch_raises(self): - with pytest.raises(WebhookSignatureError) as exc_info: + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_webhook(JSON_BODY, "0" * 64, API_SECRET) - assert "invalid webhook signature" in str(exc_info.value).lower() def test_signature_must_be_over_uncompressed_bytes(self): compressed = _gzip(JSON_BODY) sig_over_compressed = _sign(compressed) - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_webhook(compressed, sig_over_compressed, API_SECRET) def test_wrong_secret_raises(self): sig = _sign(JSON_BODY, secret="other") - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) def test_signature_can_be_bytes(self): @@ -248,9 +254,9 @@ def test_signature_can_be_bytes(self): assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT def test_malformed_signature_surfaces_as_webhook_error(self): - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_webhook(JSON_BODY, b"\xff" * 32, API_SECRET) - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_webhook(JSON_BODY, "\u2603" * 64, API_SECRET) @@ -267,13 +273,13 @@ def test_base64_plus_gzip(self): def test_signature_mismatch_raises(self): wrapped = _b64(_gzip(JSON_BODY)) - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_sqs(wrapped, "0" * 64, API_SECRET) def test_signature_over_compressed_or_wrapped_bytes_rejected(self): wrapped = _b64(_gzip(JSON_BODY)) sig_over_wrapped = _sign(wrapped.encode("ascii")) - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_sqs(wrapped, sig_over_wrapped, API_SECRET) @@ -300,7 +306,7 @@ def test_rejects_signature_over_envelope(self): wrapped = _b64(_gzip(JSON_BODY)) envelope = _sns_envelope(wrapped) sig_over_envelope = _sign(envelope.encode("utf-8")) - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_sns(envelope, sig_over_envelope, API_SECRET) @@ -320,7 +326,7 @@ def test_verify_and_parse_sns(self, sync_client: StreamChat): assert sync_client.verify_and_parse_sns(wrapped, sig) == EVENT_DICT def test_signature_mismatch_via_client(self, sync_client: StreamChat): - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): sync_client.verify_and_parse_webhook(JSON_BODY, "0" * 64) diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index 3f1dd7fc..bf9b92aa 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -19,12 +19,33 @@ import hashlib import hmac import json +import zlib from typing import Any, Dict, Optional, Union -from stream_chat.base.exceptions import WebhookSignatureError - GZIP_MAGIC = b"\x1f\x8b" +INVALID_WEBHOOK_SIGNATURE_MISMATCH = "signature mismatch" +INVALID_WEBHOOK_INVALID_BASE64 = "invalid base64 encoding" +INVALID_WEBHOOK_GZIP_FAILED = "gzip decompression failed" +INVALID_WEBHOOK_INVALID_JSON = "invalid JSON payload" + + +class InvalidWebhookError(Exception): + """Raised by every webhook primitive when verification or decoding + fails. The cross-SDK contract is "one exception, message says why" - + callers branch on the message text when they need mode-specific + behaviour (signature mismatch vs invalid base64 vs corrupt gzip vs + malformed JSON). + """ + + def __init__(self, message: str) -> None: + super().__init__(message) + self.message = message + + def __str__(self) -> str: + return f"InvalidWebhookError: {self.message}" + + _BytesLike = Union[bytes, bytearray, memoryview, str] @@ -49,8 +70,8 @@ def gunzip_payload(body: _BytesLike) -> bytes: return raw try: return gzip.decompress(raw) - except (gzip.BadGzipFile, OSError, EOFError) as exc: - raise WebhookSignatureError(f"failed to decompress gzip payload: {exc}") + except (gzip.BadGzipFile, OSError, EOFError, zlib.error) as err: + raise InvalidWebhookError(INVALID_WEBHOOK_GZIP_FAILED) from err def decode_sqs_payload(body: _BytesLike) -> bytes: @@ -65,8 +86,8 @@ def decode_sqs_payload(body: _BytesLike) -> bytes: raw = _to_bytes(body) try: decoded = base64.b64decode(raw, validate=True) - except ValueError as exc: - raise WebhookSignatureError(f"failed to base64-decode payload: {exc}") + except ValueError as err: + raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_BASE64) from err return gunzip_payload(decoded) @@ -141,8 +162,11 @@ def parse_event(payload: _BytesLike) -> Dict[str, Any]: without changing call sites. """ if isinstance(payload, (bytes, bytearray, memoryview)): - return json.loads(bytes(payload)) - return json.loads(payload) + payload = bytes(payload) + try: + return json.loads(payload) + except json.JSONDecodeError as err: + raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_JSON) from err def _verify_and_parse( @@ -151,7 +175,7 @@ def _verify_and_parse( secret: str, ) -> Dict[str, Any]: if not verify_signature(payload_bytes, signature, secret): - raise WebhookSignatureError("invalid webhook signature") + raise InvalidWebhookError(INVALID_WEBHOOK_SIGNATURE_MISMATCH) return parse_event(payload_bytes) @@ -166,7 +190,7 @@ def verify_and_parse_webhook( :param body: raw HTTP request body bytes Stream signed :param signature: ``X-Signature`` header value :param secret: the app's API secret - :raises WebhookSignatureError: on signature mismatch or decode error + :raises InvalidWebhookError: on signature mismatch or any decode error """ inflated = gunzip_payload(body) return _verify_and_parse(inflated, signature, secret) From 38bd50c21ec40764c9056260d965cef1a85eba11 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Tue, 12 May 2026 14:41:07 +0200 Subject: [PATCH 12/15] feat(webhooks): make signature optional on verify_and_parse_sqs/sns (CHA-3071) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream does not ship an X-Signature on SQS or SNS deliveries — those transports ride AWS-internal infrastructure (IAM-authenticated queues and AWS-signed SNS notifications), so HMAC verification on top is theatre. signature + secret are now optional on both module helpers and on the StreamChat / StreamChatAsync instance methods. - verify_and_parse_sqs(body) -> decode + parse - verify_and_parse_sqs(body, sig, secret) -> decode + verify + parse - verify_and_parse_sns(envelope_body) -> unwrap + decode + parse - verify_and_parse_sns(envelope_body, sig, secret) -> + verify Passing only one of (signature, secret) raises InvalidWebhookError. The HTTP-webhook path (verify_and_parse_webhook) is unchanged. Co-authored-by: Cursor --- .../webhooks_overview/webhooks_overview.md | 33 +++++---- stream_chat/base/client.py | 44 ++++++++---- stream_chat/tests/test_webhook_compression.py | 32 +++++++++ stream_chat/webhook.py | 69 +++++++++++++++---- 4 files changed, 137 insertions(+), 41 deletions(-) diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index 340f6ffb..05d36776 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -141,42 +141,49 @@ The original `client.verify_webhook(request.body, request.headers["X-Signature"] #### SQS / SNS firehose -For events delivered through SQS or SNS, call the matching helper. It base64-decodes the envelope, gzip-decompresses when the magic bytes are present, verifies the HMAC, and returns the parsed event. +For events delivered through SQS or SNS, call the matching helper. It base64-decodes the envelope, gzip-decompresses when the magic bytes are present, and returns the parsed event. + +Stream does **not** ship an `X-Signature` on SQS or SNS deliveries: those transports run on AWS-internal infrastructure that is already authenticated end-to-end. SQS queues are reached via IAM-authenticated polling, and SNS notifications carry an AWS signature on the notification envelope itself, so verifying that the message really came from your topic happens at the AWS layer. Layering an HMAC check on top is redundant, so `signature` and `secret` are optional for the SQS/SNS helpers. If you want the legacy verification pipeline you can still pass both — but you do not need to. For SQS, pass the message `Body` (already the payload): ```python -event = client.verify_and_parse_sqs( - sqs_message["Body"], - sqs_message["MessageAttributes"]["X-Signature"]["StringValue"], -) +event = client.verify_and_parse_sqs(sqs_message["Body"]) ``` For SNS, pass the **raw notification body** (the full `{"Type":"Notification", ...}` JSON envelope Amazon delivers). The SDK extracts the inner `Message` field for you, so the call site mirrors what HTTP frameworks already hand you in `request.body`: ```python -import json - # Django SNS HTTP delivery -attrs = json.loads(request.body)["MessageAttributes"] -event = client.verify_and_parse_sns( - request.body, # raw envelope (bytes/str) - attrs["X-Signature"]["Value"], -) +event = client.verify_and_parse_sns(request.body) # raw envelope (bytes/str) ``` #### Stateless / module-level form -If you do not want to construct a `StreamChat` client (for example in a lightweight Lambda that only handles webhooks), call the module-level helpers directly. They take the API secret as a third argument and are otherwise identical: +If you do not want to construct a `StreamChat` client (for example in a lightweight Lambda that only handles webhooks), call the module-level helpers directly. The HTTP helper still requires the signature and secret; the SQS/SNS helpers take them as optional positional arguments: ```python from stream_chat import webhook event = webhook.verify_and_parse_webhook(body, signature, secret) +event = webhook.verify_and_parse_sqs(message_body) +event = webhook.verify_and_parse_sns(notification_body) + +# Opt-in HMAC verification for SQS / SNS (defence in depth) event = webhook.verify_and_parse_sqs(message_body, signature, secret) event = webhook.verify_and_parse_sns(notification_body, signature, secret) ``` +Passing only one of `signature` / `secret` to the SQS or SNS helper is a programmer error and raises `InvalidWebhookError("signature and secret must both be provided to verify the SQS/SNS payload")`. + +##### Arguments + +| Argument | `verify_and_parse_webhook` | `verify_and_parse_sqs` | `verify_and_parse_sns` | +| ------------------- | -------------------------- | ---------------------- | ---------------------- | +| body / message_body / notification_body | required | required | required | +| signature | required | optional | optional | +| secret | required | optional | optional | + The module also exposes the primitives the composites are built from — `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload`, `verify_signature` (constant-time HMAC-SHA256), and `parse_event` — for callers that need to run the steps individually. All webhook requests contain these headers: diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index f2ab6b79..71024e3c 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -157,42 +157,58 @@ def verify_and_parse_webhook( def verify_and_parse_sqs( self, message_body: Union[bytes, str], - signature: Union[str, bytes], + signature: Optional[Union[str, bytes]] = None, ) -> Dict[str, Any]: - """Verify and parse an SQS firehose webhook event. + """Parse an SQS firehose webhook event. Reverses the base64 (+ optional gzip) wrapping on the SQS - ``Body``, verifies the ``X-Signature`` message attribute against - the app's API secret, and returns the parsed event. + ``Body`` and returns the parsed event. Stream does not attach + an ``X-Signature`` to SQS deliveries -- the transport is an + IAM-authenticated AWS queue, so HMAC verification on top is + redundant and signature verification is therefore optional. + When ``signature`` is supplied the app's API secret is used to + run the legacy verification pipeline. :param message_body: SQS message ``Body`` (string) - :param signature: ``X-Signature`` message attribute value + :param signature: optional ``X-Signature`` message attribute + value; when supplied, signature verification runs :raises stream_chat.webhook.InvalidWebhookError: on signature mismatch or any decode error """ from stream_chat.webhook import verify_and_parse_sqs + if signature is None: + return verify_and_parse_sqs(message_body) return verify_and_parse_sqs(message_body, signature, self.api_secret) def verify_and_parse_sns( self, - message: Union[bytes, str], - signature: Union[str, bytes], + notification_body: Union[bytes, str], + signature: Optional[Union[str, bytes]] = None, ) -> Dict[str, Any]: - """Verify and parse an SNS firehose webhook event. + """Parse an SNS firehose webhook event. Reverses the base64 (+ optional gzip) wrapping on the SNS - ``Message``, verifies the ``X-Signature`` message attribute - against the app's API secret, and returns the parsed event. - - :param message: SNS notification ``Message`` field (string) - :param signature: ``X-Signature`` message attribute value + ``Message`` and returns the parsed event. Stream does not + attach an ``X-Signature`` to SNS deliveries -- AWS already + signs the SNS notification envelope, so HMAC verification on + top is redundant and signature verification is therefore + optional. When ``signature`` is supplied the app's API secret + is used to run the legacy verification pipeline. + + :param notification_body: raw SNS notification body (the full + ``{"Type":"Notification", ...}`` JSON envelope, or a + pre-extracted ``Message`` string) + :param signature: optional ``X-Signature`` message attribute + value; when supplied, signature verification runs :raises stream_chat.webhook.InvalidWebhookError: on signature mismatch or any decode error """ from stream_chat.webhook import verify_and_parse_sns - return verify_and_parse_sns(message, signature, self.api_secret) + if signature is None: + return verify_and_parse_sns(notification_body) + return verify_and_parse_sns(notification_body, signature, self.api_secret) @abc.abstractmethod def update_app_settings( diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index 8b27ec4d..71ff8edd 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -282,6 +282,24 @@ def test_signature_over_compressed_or_wrapped_bytes_rejected(self): with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_sqs(wrapped, sig_over_wrapped, API_SECRET) + def test_verify_and_parse_sqs_without_signature_parses(self): + assert verify_and_parse_sqs(_b64(JSON_BODY)) == EVENT_DICT + assert verify_and_parse_sqs(_b64(_gzip(JSON_BODY))) == EVENT_DICT + assert verify_and_parse_sqs(_b64(_gzip(JSON_BODY)).encode()) == EVENT_DICT + + def test_static_verify_and_parse_sqs_raises_on_partial_creds(self): + wrapped = _b64(_gzip(JSON_BODY)) + with pytest.raises( + InvalidWebhookError, + match=r"signature and secret must both be provided", + ): + verify_and_parse_sqs(wrapped, _sign(JSON_BODY)) + with pytest.raises( + InvalidWebhookError, + match=r"signature and secret must both be provided", + ): + verify_and_parse_sqs(wrapped, secret=API_SECRET) + class TestVerifyAndParseSns: def test_pre_extracted_message_round_trip(self): @@ -309,6 +327,13 @@ def test_rejects_signature_over_envelope(self): with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): verify_and_parse_sns(envelope, sig_over_envelope, API_SECRET) + def test_verify_and_parse_sns_without_signature_parses(self): + wrapped = _b64(_gzip(JSON_BODY)) + envelope = _sns_envelope(wrapped) + assert verify_and_parse_sns(envelope) == EVENT_DICT + assert verify_and_parse_sns(wrapped) == EVENT_DICT + assert verify_and_parse_sns(_b64(JSON_BODY)) == EVENT_DICT + class TestSyncClientMethods: def test_verify_and_parse_webhook(self, sync_client: StreamChat): @@ -329,6 +354,13 @@ def test_signature_mismatch_via_client(self, sync_client: StreamChat): with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): sync_client.verify_and_parse_webhook(JSON_BODY, "0" * 64) + def test_instance_verify_and_parse_sqs_without_signature(self): + client = StreamChat(api_key=API_KEY, api_secret="") + wrapped = _b64(_gzip(JSON_BODY)) + envelope = _sns_envelope(wrapped) + assert client.verify_and_parse_sqs(wrapped) == EVENT_DICT + assert client.verify_and_parse_sns(envelope) == EVENT_DICT + class TestSyncClientLegacyVerifyWebhook: """The legacy boolean helper stays unchanged for backward compatibility.""" diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index bf9b92aa..03f0b662 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -28,6 +28,9 @@ INVALID_WEBHOOK_INVALID_BASE64 = "invalid base64 encoding" INVALID_WEBHOOK_GZIP_FAILED = "gzip decompression failed" INVALID_WEBHOOK_INVALID_JSON = "invalid JSON payload" +INVALID_WEBHOOK_PARTIAL_AWS_CREDS = ( + "signature and secret must both be provided to verify the SQS/SNS payload" +) class InvalidWebhookError(Exception): @@ -179,6 +182,18 @@ def _verify_and_parse( return parse_event(payload_bytes) +def _maybe_verify_and_parse( + payload_bytes: bytes, + signature: Optional[Union[str, bytes]], + secret: Optional[str], +) -> Dict[str, Any]: + if not signature and not secret: + return parse_event(payload_bytes) + if not signature or not secret: + raise InvalidWebhookError(INVALID_WEBHOOK_PARTIAL_AWS_CREDS) + return _verify_and_parse(payload_bytes, signature, secret) + + def verify_and_parse_webhook( body: _BytesLike, signature: Union[str, bytes], @@ -198,25 +213,51 @@ def verify_and_parse_webhook( def verify_and_parse_sqs( message_body: _BytesLike, - signature: Union[str, bytes], - secret: str, + signature: Optional[Union[str, bytes]] = None, + secret: Optional[str] = None, ) -> Dict[str, Any]: - """Decode the SQS ``Body`` (base64, then gzip-if-magic), verify the - HMAC ``signature`` from the ``X-Signature`` message attribute, and - return the parsed event. + """Decode the SQS ``Body`` (base64, then gzip-if-magic) and return + the parsed event. + + Stream does not attach an ``X-Signature`` to SQS deliveries: the + transport is an IAM-authenticated AWS queue, so the queue ARN + already proves origin. HMAC verification on top is redundant and + is therefore optional. When ``signature`` and ``secret`` are both + supplied the legacy verification pipeline still runs, so existing + callers keep working unchanged. + + :param message_body: SQS message ``Body`` (string) + :param signature: optional ``X-Signature`` message attribute value + :param secret: optional API secret matching ``signature`` + :raises InvalidWebhookError: on signature mismatch, any decode + error, or when only one of ``signature`` / ``secret`` is given """ inflated = decode_sqs_payload(message_body) - return _verify_and_parse(inflated, signature, secret) + return _maybe_verify_and_parse(inflated, signature, secret) def verify_and_parse_sns( - message: _BytesLike, - signature: Union[str, bytes], - secret: str, + notification_body: _BytesLike, + signature: Optional[Union[str, bytes]] = None, + secret: Optional[str] = None, ) -> Dict[str, Any]: - """Decode the SNS ``Message`` (identical to SQS handling), verify - the HMAC ``signature`` from the ``X-Signature`` message attribute, - and return the parsed event. + """Decode the SNS ``Message`` (identical to SQS handling) and return + the parsed event. + + Stream does not attach an ``X-Signature`` to SNS deliveries: AWS + already signs the SNS notification envelope, so verifying that the + request really came from your topic happens at the SNS layer. + HMAC verification on top is optional. When ``signature`` and + ``secret`` are both supplied the legacy verification pipeline still + runs, so existing callers keep working unchanged. + + :param notification_body: raw SNS notification body (the full + ``{"Type":"Notification", ...}`` JSON envelope, or a + pre-extracted ``Message`` string) + :param signature: optional ``X-Signature`` message attribute value + :param secret: optional API secret matching ``signature`` + :raises InvalidWebhookError: on signature mismatch, any decode + error, or when only one of ``signature`` / ``secret`` is given """ - inflated = decode_sns_payload(message) - return _verify_and_parse(inflated, signature, secret) + inflated = decode_sns_payload(notification_body) + return _maybe_verify_and_parse(inflated, signature, secret) From 47b9c38e5739dd98dbe0ca105d3fb7415e264def Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Tue, 12 May 2026 15:07:16 +0200 Subject: [PATCH 13/15] fix(webhooks): parseSqs/ParseSns decode-only; HTTP verify via verifyAndParseWebhook; docs + tests Co-authored-by: Cursor --- .../webhooks_overview/webhooks_overview.md | 26 ++- stream_chat/base/client.py | 64 ++----- stream_chat/tests/test_webhook_compression.py | 168 +++++------------- stream_chat/webhook.py | 126 +++---------- 4 files changed, 90 insertions(+), 294 deletions(-) diff --git a/docs/webhooks/webhooks_overview/webhooks_overview.md b/docs/webhooks/webhooks_overview/webhooks_overview.md index 05d36776..5f9895fa 100644 --- a/docs/webhooks/webhooks_overview/webhooks_overview.md +++ b/docs/webhooks/webhooks_overview/webhooks_overview.md @@ -143,46 +143,40 @@ The original `client.verify_webhook(request.body, request.headers["X-Signature"] For events delivered through SQS or SNS, call the matching helper. It base64-decodes the envelope, gzip-decompresses when the magic bytes are present, and returns the parsed event. -Stream does **not** ship an `X-Signature` on SQS or SNS deliveries: those transports run on AWS-internal infrastructure that is already authenticated end-to-end. SQS queues are reached via IAM-authenticated polling, and SNS notifications carry an AWS signature on the notification envelope itself, so verifying that the message really came from your topic happens at the AWS layer. Layering an HMAC check on top is redundant, so `signature` and `secret` are optional for the SQS/SNS helpers. If you want the legacy verification pipeline you can still pass both — but you do not need to. +Stream does **not** ship an `X-Signature` on SQS or SNS deliveries: those transports run on AWS-internal infrastructure that is already authenticated end-to-end. SQS queues are reached via IAM-authenticated polling, and SNS notifications carry an AWS signature on the notification envelope itself, so verifying that the message really came from your topic happens at the AWS layer. Layering an HMAC check on top is redundant, so the SQS/SNS helpers only decode and parse — they take a single argument and never verify a signature. For SQS, pass the message `Body` (already the payload): ```python -event = client.verify_and_parse_sqs(sqs_message["Body"]) +event = client.parse_sqs(sqs_message["Body"]) ``` For SNS, pass the **raw notification body** (the full `{"Type":"Notification", ...}` JSON envelope Amazon delivers). The SDK extracts the inner `Message` field for you, so the call site mirrors what HTTP frameworks already hand you in `request.body`: ```python # Django SNS HTTP delivery -event = client.verify_and_parse_sns(request.body) # raw envelope (bytes/str) +event = client.parse_sns(request.body) # raw envelope (bytes/str) ``` #### Stateless / module-level form -If you do not want to construct a `StreamChat` client (for example in a lightweight Lambda that only handles webhooks), call the module-level helpers directly. The HTTP helper still requires the signature and secret; the SQS/SNS helpers take them as optional positional arguments: +If you do not want to construct a `StreamChat` client (for example in a lightweight Lambda that only handles webhooks), call the module-level helpers directly. The HTTP helper still requires the signature and secret; the SQS/SNS helpers take a single argument: ```python from stream_chat import webhook event = webhook.verify_and_parse_webhook(body, signature, secret) -event = webhook.verify_and_parse_sqs(message_body) -event = webhook.verify_and_parse_sns(notification_body) - -# Opt-in HMAC verification for SQS / SNS (defence in depth) -event = webhook.verify_and_parse_sqs(message_body, signature, secret) -event = webhook.verify_and_parse_sns(notification_body, signature, secret) +event = webhook.parse_sqs(message_body) +event = webhook.parse_sns(notification_body) ``` -Passing only one of `signature` / `secret` to the SQS or SNS helper is a programmer error and raises `InvalidWebhookError("signature and secret must both be provided to verify the SQS/SNS payload")`. - ##### Arguments -| Argument | `verify_and_parse_webhook` | `verify_and_parse_sqs` | `verify_and_parse_sns` | -| ------------------- | -------------------------- | ---------------------- | ---------------------- | +| Argument | `verify_and_parse_webhook` | `parse_sqs` | `parse_sns` | +| ------------------- | -------------------------- | --------------- | -------------------- | | body / message_body / notification_body | required | required | required | -| signature | required | optional | optional | -| secret | required | optional | optional | +| signature | required | — | — | +| secret | required | — | — | The module also exposes the primitives the composites are built from — `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload`, `verify_signature` (constant-time HMAC-SHA256), and `parse_event` — for callers that need to run the steps individually. diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index 71024e3c..1639e846 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -147,68 +147,26 @@ def verify_and_parse_webhook( :param body: raw HTTP request body bytes Stream signed :param signature: ``X-Signature`` header value - :raises stream_chat.webhook.InvalidWebhookError: on + :raises stream_chat.base.exceptions.WebhookSignatureError: on signature mismatch or any decode error """ from stream_chat.webhook import verify_and_parse_webhook return verify_and_parse_webhook(body, signature, self.api_secret) - def verify_and_parse_sqs( - self, - message_body: Union[bytes, str], - signature: Optional[Union[str, bytes]] = None, - ) -> Dict[str, Any]: - """Parse an SQS firehose webhook event. - - Reverses the base64 (+ optional gzip) wrapping on the SQS - ``Body`` and returns the parsed event. Stream does not attach - an ``X-Signature`` to SQS deliveries -- the transport is an - IAM-authenticated AWS queue, so HMAC verification on top is - redundant and signature verification is therefore optional. - When ``signature`` is supplied the app's API secret is used to - run the legacy verification pipeline. - - :param message_body: SQS message ``Body`` (string) - :param signature: optional ``X-Signature`` message attribute - value; when supplied, signature verification runs - :raises stream_chat.webhook.InvalidWebhookError: on - signature mismatch or any decode error - """ - from stream_chat.webhook import verify_and_parse_sqs + def parse_sqs(self, message_body: Union[bytes, str]) -> Dict[str, Any]: + """Parse an SQS firehose body (base64 + optional gzip). No HMAC.""" - if signature is None: - return verify_and_parse_sqs(message_body) - return verify_and_parse_sqs(message_body, signature, self.api_secret) + from stream_chat.webhook import parse_sqs - def verify_and_parse_sns( - self, - notification_body: Union[bytes, str], - signature: Optional[Union[str, bytes]] = None, - ) -> Dict[str, Any]: - """Parse an SNS firehose webhook event. - - Reverses the base64 (+ optional gzip) wrapping on the SNS - ``Message`` and returns the parsed event. Stream does not - attach an ``X-Signature`` to SNS deliveries -- AWS already - signs the SNS notification envelope, so HMAC verification on - top is redundant and signature verification is therefore - optional. When ``signature`` is supplied the app's API secret - is used to run the legacy verification pipeline. - - :param notification_body: raw SNS notification body (the full - ``{"Type":"Notification", ...}`` JSON envelope, or a - pre-extracted ``Message`` string) - :param signature: optional ``X-Signature`` message attribute - value; when supplied, signature verification runs - :raises stream_chat.webhook.InvalidWebhookError: on - signature mismatch or any decode error - """ - from stream_chat.webhook import verify_and_parse_sns + return parse_sqs(message_body) + + def parse_sns(self, message: Union[bytes, str]) -> Dict[str, Any]: + """Parse an SNS body (unwraps SNS envelope when present). No HMAC.""" + + from stream_chat.webhook import parse_sns - if signature is None: - return verify_and_parse_sns(notification_body) - return verify_and_parse_sns(notification_body, signature, self.api_secret) + return parse_sns(message) @abc.abstractmethod def update_app_settings( diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index 71ff8edd..65ca87d8 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -4,10 +4,10 @@ * Module-level functions in :mod:`stream_chat.webhook`: - * primitives - ``gunzip_payload``, ``decode_sqs_payload``, + * primitives - ``ungzip_payload``, ``decode_sqs_payload``, ``decode_sns_payload``, ``verify_signature``, ``parse_event`` - * composite helpers - ``verify_and_parse_webhook``, - ``verify_and_parse_sqs``, ``verify_and_parse_sns`` + * composite helpers - ``verify_and_parse_webhook``, ``parse_sqs``, + ``parse_sns`` * Client-instance forms on :class:`StreamChat` and :class:`StreamChatAsync`. They take ``api_secret`` from the client and @@ -27,15 +27,15 @@ import pytest from stream_chat import StreamChat, StreamChatAsync +from stream_chat.base.exceptions import WebhookSignatureError from stream_chat.webhook import ( GZIP_MAGIC, - InvalidWebhookError, decode_sns_payload, decode_sqs_payload, - gunzip_payload, parse_event, - verify_and_parse_sns, - verify_and_parse_sqs, + parse_sns, + parse_sqs, + ungzip_payload, verify_and_parse_webhook, verify_signature, ) @@ -66,39 +66,31 @@ def sync_client() -> StreamChat: return StreamChat(api_key=API_KEY, api_secret=API_SECRET) -class TestGunzipPayload: +class TestUngzipPayload: def test_passthrough_plain_bytes(self): - assert gunzip_payload(JSON_BODY) == JSON_BODY + assert ungzip_payload(JSON_BODY) == JSON_BODY def test_passthrough_str_input(self): - assert gunzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY + assert ungzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY def test_inflates_gzip_bytes(self): - assert gunzip_payload(_gzip(JSON_BODY)) == JSON_BODY + assert ungzip_payload(_gzip(JSON_BODY)) == JSON_BODY def test_returns_bytes(self): - assert isinstance(gunzip_payload(JSON_BODY), bytes) - assert isinstance(gunzip_payload(_gzip(JSON_BODY)), bytes) + assert isinstance(ungzip_payload(JSON_BODY), bytes) + assert isinstance(ungzip_payload(_gzip(JSON_BODY)), bytes) def test_empty_input(self): - assert gunzip_payload(b"") == b"" + assert ungzip_payload(b"") == b"" def test_short_input_below_magic_length(self): - assert gunzip_payload(b"ab") == b"ab" + assert ungzip_payload(b"ab") == b"ab" def test_truncated_gzip_with_magic_raises(self): bad = GZIP_MAGIC + b"\x00\x00\x00" - with pytest.raises(InvalidWebhookError, match=r"gzip decompression failed"): - gunzip_payload(bad) - - def test_gunzip_payload_raises_on_corrupt_gzip(self): - corrupt = GZIP_MAGIC + b"\x08\x00" + b"\x00" * 20 - with pytest.raises(InvalidWebhookError, match=r"gzip decompression failed"): - gunzip_payload(corrupt) - - def test_decompresses_helloworld_fixture(self): - gz_bytes = base64.b64decode("H4sIAGrYAWoAA8tIzcnJL88vykkBAK0g6/kKAAAA") - assert gunzip_payload(gz_bytes) == b"helloworld" + with pytest.raises(WebhookSignatureError) as exc_info: + ungzip_payload(bad) + assert "decompress" in str(exc_info.value).lower() class TestDecodeSqsPayload: @@ -118,21 +110,9 @@ def test_accepts_bytes_input(self): assert decode_sqs_payload(encoded) == JSON_BODY def test_invalid_base64_raises(self): - with pytest.raises(InvalidWebhookError, match=r"invalid base64 encoding"): + with pytest.raises(WebhookSignatureError) as exc_info: decode_sqs_payload("!!!not-valid-base64!!!") - - def test_decode_sqs_payload_raises_on_invalid_base64(self): - with pytest.raises(InvalidWebhookError, match=r"invalid base64 encoding"): - decode_sqs_payload("not*valid*base64*data") - - def test_decodes_helloworld_base64_fixture(self): - assert decode_sqs_payload("aGVsbG93b3JsZA==") == b"helloworld" - - def test_decodes_helloworld_base64_gzip_fixture(self): - assert ( - decode_sqs_payload("H4sIAGrYAWoAA8tIzcnJL88vykkBAK0g6/kKAAAA") - == b"helloworld" - ) + assert "base64" in str(exc_info.value).lower() def _sns_envelope(inner_message: str) -> str: @@ -215,8 +195,8 @@ def test_unknown_event_type_still_parses(self): body = b'{"type":"a.future.event","custom":42}' assert parse_event(body) == {"type": "a.future.event", "custom": 42} - def test_parse_event_raises_on_invalid_json(self): - with pytest.raises(InvalidWebhookError, match=r"invalid JSON payload"): + def test_malformed_json_raises(self): + with pytest.raises(json.JSONDecodeError): parse_event(b"not json") @@ -235,18 +215,19 @@ def test_returns_dict(self): assert isinstance(result, dict) def test_signature_mismatch_raises(self): - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): + with pytest.raises(WebhookSignatureError) as exc_info: verify_and_parse_webhook(JSON_BODY, "0" * 64, API_SECRET) + assert "invalid webhook signature" in str(exc_info.value).lower() def test_signature_must_be_over_uncompressed_bytes(self): compressed = _gzip(JSON_BODY) sig_over_compressed = _sign(compressed) - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): + with pytest.raises(WebhookSignatureError): verify_and_parse_webhook(compressed, sig_over_compressed, API_SECRET) def test_wrong_secret_raises(self): sig = _sign(JSON_BODY, secret="other") - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): + with pytest.raises(WebhookSignatureError): verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) def test_signature_can_be_bytes(self): @@ -254,85 +235,35 @@ def test_signature_can_be_bytes(self): assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT def test_malformed_signature_surfaces_as_webhook_error(self): - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): + with pytest.raises(WebhookSignatureError): verify_and_parse_webhook(JSON_BODY, b"\xff" * 32, API_SECRET) - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): + with pytest.raises(WebhookSignatureError): verify_and_parse_webhook(JSON_BODY, "\u2603" * 64, API_SECRET) -class TestVerifyAndParseSqs: +class TestParseSqs: def test_base64_only(self): wrapped = _b64(JSON_BODY) - sig = _sign(JSON_BODY) - assert verify_and_parse_sqs(wrapped, sig, API_SECRET) == EVENT_DICT + assert parse_sqs(wrapped) == EVENT_DICT def test_base64_plus_gzip(self): wrapped = _b64(_gzip(JSON_BODY)) - sig = _sign(JSON_BODY) - assert verify_and_parse_sqs(wrapped, sig, API_SECRET) == EVENT_DICT + assert parse_sqs(wrapped) == EVENT_DICT - def test_signature_mismatch_raises(self): - wrapped = _b64(_gzip(JSON_BODY)) - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): - verify_and_parse_sqs(wrapped, "0" * 64, API_SECRET) - - def test_signature_over_compressed_or_wrapped_bytes_rejected(self): - wrapped = _b64(_gzip(JSON_BODY)) - sig_over_wrapped = _sign(wrapped.encode("ascii")) - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): - verify_and_parse_sqs(wrapped, sig_over_wrapped, API_SECRET) - - def test_verify_and_parse_sqs_without_signature_parses(self): - assert verify_and_parse_sqs(_b64(JSON_BODY)) == EVENT_DICT - assert verify_and_parse_sqs(_b64(_gzip(JSON_BODY))) == EVENT_DICT - assert verify_and_parse_sqs(_b64(_gzip(JSON_BODY)).encode()) == EVENT_DICT - def test_static_verify_and_parse_sqs_raises_on_partial_creds(self): - wrapped = _b64(_gzip(JSON_BODY)) - with pytest.raises( - InvalidWebhookError, - match=r"signature and secret must both be provided", - ): - verify_and_parse_sqs(wrapped, _sign(JSON_BODY)) - with pytest.raises( - InvalidWebhookError, - match=r"signature and secret must both be provided", - ): - verify_and_parse_sqs(wrapped, secret=API_SECRET) - - -class TestVerifyAndParseSns: +class TestParseSns: def test_pre_extracted_message_round_trip(self): wrapped = _b64(_gzip(JSON_BODY)) - sig = _sign(JSON_BODY) - assert verify_and_parse_sns(wrapped, sig, API_SECRET) == EVENT_DICT + assert parse_sns(wrapped) == EVENT_DICT def test_matches_sqs_behaviour_for_pre_extracted_message(self): wrapped = _b64(_gzip(JSON_BODY)) - sig = _sign(JSON_BODY) - assert verify_and_parse_sns(wrapped, sig, API_SECRET) == verify_and_parse_sqs( - wrapped, sig, API_SECRET - ) + assert parse_sns(wrapped) == parse_sqs(wrapped) def test_full_sns_envelope(self): wrapped = _b64(_gzip(JSON_BODY)) envelope = _sns_envelope(wrapped) - sig = _sign(JSON_BODY) - assert verify_and_parse_sns(envelope, sig, API_SECRET) == EVENT_DICT - - def test_rejects_signature_over_envelope(self): - wrapped = _b64(_gzip(JSON_BODY)) - envelope = _sns_envelope(wrapped) - sig_over_envelope = _sign(envelope.encode("utf-8")) - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): - verify_and_parse_sns(envelope, sig_over_envelope, API_SECRET) - - def test_verify_and_parse_sns_without_signature_parses(self): - wrapped = _b64(_gzip(JSON_BODY)) - envelope = _sns_envelope(wrapped) - assert verify_and_parse_sns(envelope) == EVENT_DICT - assert verify_and_parse_sns(wrapped) == EVENT_DICT - assert verify_and_parse_sns(_b64(JSON_BODY)) == EVENT_DICT + assert parse_sns(envelope) == EVENT_DICT class TestSyncClientMethods: @@ -340,27 +271,18 @@ def test_verify_and_parse_webhook(self, sync_client: StreamChat): sig = _sign(JSON_BODY) assert sync_client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT - def test_verify_and_parse_sqs(self, sync_client: StreamChat): + def test_parse_sqs(self, sync_client: StreamChat): wrapped = _b64(_gzip(JSON_BODY)) - sig = _sign(JSON_BODY) - assert sync_client.verify_and_parse_sqs(wrapped, sig) == EVENT_DICT + assert sync_client.parse_sqs(wrapped) == EVENT_DICT - def test_verify_and_parse_sns(self, sync_client: StreamChat): + def test_parse_sns(self, sync_client: StreamChat): wrapped = _b64(_gzip(JSON_BODY)) - sig = _sign(JSON_BODY) - assert sync_client.verify_and_parse_sns(wrapped, sig) == EVENT_DICT + assert sync_client.parse_sns(wrapped) == EVENT_DICT def test_signature_mismatch_via_client(self, sync_client: StreamChat): - with pytest.raises(InvalidWebhookError, match=r"signature mismatch"): + with pytest.raises(WebhookSignatureError): sync_client.verify_and_parse_webhook(JSON_BODY, "0" * 64) - def test_instance_verify_and_parse_sqs_without_signature(self): - client = StreamChat(api_key=API_KEY, api_secret="") - wrapped = _b64(_gzip(JSON_BODY)) - envelope = _sns_envelope(wrapped) - assert client.verify_and_parse_sqs(wrapped) == EVENT_DICT - assert client.verify_and_parse_sns(envelope) == EVENT_DICT - class TestSyncClientLegacyVerifyWebhook: """The legacy boolean helper stays unchanged for backward compatibility.""" @@ -378,14 +300,12 @@ async def test_verify_and_parse_webhook(self): async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: assert client.verify_and_parse_webhook(_gzip(JSON_BODY), sig) == EVENT_DICT - async def test_verify_and_parse_sqs(self): + async def test_parse_sqs(self): wrapped = _b64(_gzip(JSON_BODY)) - sig = _sign(JSON_BODY) async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: - assert client.verify_and_parse_sqs(wrapped, sig) == EVENT_DICT + assert client.parse_sqs(wrapped) == EVENT_DICT - async def test_verify_and_parse_sns(self): + async def test_parse_sns(self): wrapped = _b64(_gzip(JSON_BODY)) - sig = _sign(JSON_BODY) async with StreamChatAsync(api_key=API_KEY, api_secret=API_SECRET) as client: - assert client.verify_and_parse_sns(wrapped, sig) == EVENT_DICT + assert client.parse_sns(wrapped) == EVENT_DICT diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index 03f0b662..40440c14 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -6,8 +6,7 @@ https://getstream.io/chat/docs/node/webhooks_overview/. The composite functions (:func:`verify_and_parse_webhook`, -:func:`verify_and_parse_sqs`, :func:`verify_and_parse_sns`) are the -recommended entry points. The primitives they compose are exposed so +:func:`parse_sqs`, :func:`parse_sns`) are the recommended entry points. The primitives they compose are exposed so callers can build custom flows or run individual steps in isolation. The Python SDK currently returns the parsed JSON as a ``dict``; typed @@ -19,35 +18,11 @@ import hashlib import hmac import json -import zlib from typing import Any, Dict, Optional, Union -GZIP_MAGIC = b"\x1f\x8b" - -INVALID_WEBHOOK_SIGNATURE_MISMATCH = "signature mismatch" -INVALID_WEBHOOK_INVALID_BASE64 = "invalid base64 encoding" -INVALID_WEBHOOK_GZIP_FAILED = "gzip decompression failed" -INVALID_WEBHOOK_INVALID_JSON = "invalid JSON payload" -INVALID_WEBHOOK_PARTIAL_AWS_CREDS = ( - "signature and secret must both be provided to verify the SQS/SNS payload" -) - - -class InvalidWebhookError(Exception): - """Raised by every webhook primitive when verification or decoding - fails. The cross-SDK contract is "one exception, message says why" - - callers branch on the message text when they need mode-specific - behaviour (signature mismatch vs invalid base64 vs corrupt gzip vs - malformed JSON). - """ - - def __init__(self, message: str) -> None: - super().__init__(message) - self.message = message - - def __str__(self) -> str: - return f"InvalidWebhookError: {self.message}" +from stream_chat.base.exceptions import WebhookSignatureError +GZIP_MAGIC = b"\x1f\x8b" _BytesLike = Union[bytes, bytearray, memoryview, str] @@ -60,7 +35,7 @@ def _to_bytes(body: _BytesLike) -> bytes: raise TypeError(f"webhook body must be bytes or str, got {type(body).__name__}") -def gunzip_payload(body: _BytesLike) -> bytes: +def ungzip_payload(body: _BytesLike) -> bytes: """Return ``body`` unchanged unless it starts with the gzip magic (``1f 8b``, per RFC 1952), in which case the gzip stream is decompressed. @@ -73,8 +48,8 @@ def gunzip_payload(body: _BytesLike) -> bytes: return raw try: return gzip.decompress(raw) - except (gzip.BadGzipFile, OSError, EOFError, zlib.error) as err: - raise InvalidWebhookError(INVALID_WEBHOOK_GZIP_FAILED) from err + except (gzip.BadGzipFile, OSError, EOFError) as exc: + raise WebhookSignatureError(f"failed to decompress gzip payload: {exc}") def decode_sqs_payload(body: _BytesLike) -> bytes: @@ -89,9 +64,9 @@ def decode_sqs_payload(body: _BytesLike) -> bytes: raw = _to_bytes(body) try: decoded = base64.b64decode(raw, validate=True) - except ValueError as err: - raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_BASE64) from err - return gunzip_payload(decoded) + except ValueError as exc: + raise WebhookSignatureError(f"failed to base64-decode payload: {exc}") + return ungzip_payload(decoded) def decode_sns_payload(notification_body: _BytesLike) -> bytes: @@ -165,11 +140,8 @@ def parse_event(payload: _BytesLike) -> Dict[str, Any]: without changing call sites. """ if isinstance(payload, (bytes, bytearray, memoryview)): - payload = bytes(payload) - try: - return json.loads(payload) - except json.JSONDecodeError as err: - raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_JSON) from err + return json.loads(bytes(payload)) + return json.loads(payload) def _verify_and_parse( @@ -178,22 +150,10 @@ def _verify_and_parse( secret: str, ) -> Dict[str, Any]: if not verify_signature(payload_bytes, signature, secret): - raise InvalidWebhookError(INVALID_WEBHOOK_SIGNATURE_MISMATCH) + raise WebhookSignatureError("invalid webhook signature") return parse_event(payload_bytes) -def _maybe_verify_and_parse( - payload_bytes: bytes, - signature: Optional[Union[str, bytes]], - secret: Optional[str], -) -> Dict[str, Any]: - if not signature and not secret: - return parse_event(payload_bytes) - if not signature or not secret: - raise InvalidWebhookError(INVALID_WEBHOOK_PARTIAL_AWS_CREDS) - return _verify_and_parse(payload_bytes, signature, secret) - - def verify_and_parse_webhook( body: _BytesLike, signature: Union[str, bytes], @@ -205,59 +165,23 @@ def verify_and_parse_webhook( :param body: raw HTTP request body bytes Stream signed :param signature: ``X-Signature`` header value :param secret: the app's API secret - :raises InvalidWebhookError: on signature mismatch or any decode error + :raises WebhookSignatureError: on signature mismatch or decode error """ - inflated = gunzip_payload(body) + inflated = ungzip_payload(body) return _verify_and_parse(inflated, signature, secret) -def verify_and_parse_sqs( - message_body: _BytesLike, - signature: Optional[Union[str, bytes]] = None, - secret: Optional[str] = None, -) -> Dict[str, Any]: - """Decode the SQS ``Body`` (base64, then gzip-if-magic) and return - the parsed event. - - Stream does not attach an ``X-Signature`` to SQS deliveries: the - transport is an IAM-authenticated AWS queue, so the queue ARN - already proves origin. HMAC verification on top is redundant and - is therefore optional. When ``signature`` and ``secret`` are both - supplied the legacy verification pipeline still runs, so existing - callers keep working unchanged. - - :param message_body: SQS message ``Body`` (string) - :param signature: optional ``X-Signature`` message attribute value - :param secret: optional API secret matching ``signature`` - :raises InvalidWebhookError: on signature mismatch, any decode - error, or when only one of ``signature`` / ``secret`` is given - """ +def parse_sqs(message_body: _BytesLike) -> Dict[str, Any]: + """Decode the SQS ``Body`` (base64, then gzip-if-magic) and return the + parsed event. Stream does not HMAC-sign SQS message bodies.""" + inflated = decode_sqs_payload(message_body) - return _maybe_verify_and_parse(inflated, signature, secret) + return parse_event(inflated) -def verify_and_parse_sns( - notification_body: _BytesLike, - signature: Optional[Union[str, bytes]] = None, - secret: Optional[str] = None, -) -> Dict[str, Any]: - """Decode the SNS ``Message`` (identical to SQS handling) and return - the parsed event. - - Stream does not attach an ``X-Signature`` to SNS deliveries: AWS - already signs the SNS notification envelope, so verifying that the - request really came from your topic happens at the SNS layer. - HMAC verification on top is optional. When ``signature`` and - ``secret`` are both supplied the legacy verification pipeline still - runs, so existing callers keep working unchanged. - - :param notification_body: raw SNS notification body (the full - ``{"Type":"Notification", ...}`` JSON envelope, or a - pre-extracted ``Message`` string) - :param signature: optional ``X-Signature`` message attribute value - :param secret: optional API secret matching ``signature`` - :raises InvalidWebhookError: on signature mismatch, any decode - error, or when only one of ``signature`` / ``secret`` is given - """ - inflated = decode_sns_payload(notification_body) - return _maybe_verify_and_parse(inflated, signature, secret) +def parse_sns(message: _BytesLike) -> Dict[str, Any]: + """Decode an SNS-delivered payload (unwraps envelope JSON when present, + then same path as :func:`parse_sqs`). No verification step.""" + + inflated = decode_sns_payload(message) + return parse_event(inflated) From 08b893d7574b69b5f72c44fbfc271c8234d96d1f Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Tue, 12 May 2026 15:13:42 +0200 Subject: [PATCH 14/15] fix(webhooks): Go ErrInvalidWebhook + VerifySignature(error); Ruby WebhookSignatureError + parse_*; Python WebhookSignatureError; guard test init without STREAM_* --- stream_chat/base/exceptions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/stream_chat/base/exceptions.py b/stream_chat/base/exceptions.py index 9ed295bb..028295b3 100644 --- a/stream_chat/base/exceptions.py +++ b/stream_chat/base/exceptions.py @@ -6,6 +6,10 @@ class StreamChannelException(Exception): pass +class WebhookSignatureError(Exception): + """Invalid webhook signature or malformed gzip/base64 envelope.""" + + class StreamAPIException(Exception): def __init__(self, text: str, status_code: int) -> None: self.response_text = text From 28d801594cf2c24c2b0934e62e7adff9ff070aaa Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Tue, 12 May 2026 15:56:49 +0200 Subject: [PATCH 15/15] =?UTF-8?q?feat(webhooks):=20align=20cross-SDK=20con?= =?UTF-8?q?tract=20=E2=80=94=20InvalidWebhookError=20+=20gunzip=5Fpayload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename WebhookSignatureError → InvalidWebhookError - Rename ungzip_payload → gunzip_payload - Align error messages to documented strings: signature mismatch / invalid base64 encoding / gzip decompression failed / invalid JSON payload - parse_event now wraps json.JSONDecodeError as InvalidWebhookError - Export INVALID_WEBHOOK_* constants for exact-match filtering --- stream_chat/base/client.py | 2 +- stream_chat/base/exceptions.py | 17 +++++- stream_chat/tests/test_webhook_compression.py | 59 +++++++++++-------- stream_chat/webhook.py | 31 ++++++---- 4 files changed, 69 insertions(+), 40 deletions(-) diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index 1639e846..4c17bbb7 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -147,7 +147,7 @@ def verify_and_parse_webhook( :param body: raw HTTP request body bytes Stream signed :param signature: ``X-Signature`` header value - :raises stream_chat.base.exceptions.WebhookSignatureError: on + :raises stream_chat.base.exceptions.InvalidWebhookError: on signature mismatch or any decode error """ from stream_chat.webhook import verify_and_parse_webhook diff --git a/stream_chat/base/exceptions.py b/stream_chat/base/exceptions.py index 028295b3..6a592d8c 100644 --- a/stream_chat/base/exceptions.py +++ b/stream_chat/base/exceptions.py @@ -6,8 +6,21 @@ class StreamChannelException(Exception): pass -class WebhookSignatureError(Exception): - """Invalid webhook signature or malformed gzip/base64 envelope.""" +class InvalidWebhookError(Exception): + """Invalid webhook signature or malformed gzip/base64/JSON envelope. + + Raised by :mod:`stream_chat.webhook` on any failure path: signature + mismatch, malformed base64, gzip decompression failure, or invalid + JSON payload. The message text identifies the failure mode so + callers that want to differentiate (security logging, retry policy) + can filter on substring or on the module-level constants. + """ + + +INVALID_WEBHOOK_SIGNATURE_MISMATCH = "signature mismatch" +INVALID_WEBHOOK_INVALID_BASE64 = "invalid base64 encoding" +INVALID_WEBHOOK_GZIP_FAILED = "gzip decompression failed" +INVALID_WEBHOOK_INVALID_JSON = "invalid JSON payload" class StreamAPIException(Exception): diff --git a/stream_chat/tests/test_webhook_compression.py b/stream_chat/tests/test_webhook_compression.py index 65ca87d8..c22853fd 100644 --- a/stream_chat/tests/test_webhook_compression.py +++ b/stream_chat/tests/test_webhook_compression.py @@ -4,7 +4,7 @@ * Module-level functions in :mod:`stream_chat.webhook`: - * primitives - ``ungzip_payload``, ``decode_sqs_payload``, + * primitives - ``gunzip_payload``, ``decode_sqs_payload``, ``decode_sns_payload``, ``verify_signature``, ``parse_event`` * composite helpers - ``verify_and_parse_webhook``, ``parse_sqs``, ``parse_sns`` @@ -27,15 +27,21 @@ import pytest from stream_chat import StreamChat, StreamChatAsync -from stream_chat.base.exceptions import WebhookSignatureError +from stream_chat.base.exceptions import ( + INVALID_WEBHOOK_GZIP_FAILED, + INVALID_WEBHOOK_INVALID_BASE64, + INVALID_WEBHOOK_INVALID_JSON, + INVALID_WEBHOOK_SIGNATURE_MISMATCH, + InvalidWebhookError, +) from stream_chat.webhook import ( GZIP_MAGIC, decode_sns_payload, decode_sqs_payload, + gunzip_payload, parse_event, parse_sns, parse_sqs, - ungzip_payload, verify_and_parse_webhook, verify_signature, ) @@ -66,31 +72,31 @@ def sync_client() -> StreamChat: return StreamChat(api_key=API_KEY, api_secret=API_SECRET) -class TestUngzipPayload: +class TestGunzipPayload: def test_passthrough_plain_bytes(self): - assert ungzip_payload(JSON_BODY) == JSON_BODY + assert gunzip_payload(JSON_BODY) == JSON_BODY def test_passthrough_str_input(self): - assert ungzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY + assert gunzip_payload(JSON_BODY.decode("utf-8")) == JSON_BODY def test_inflates_gzip_bytes(self): - assert ungzip_payload(_gzip(JSON_BODY)) == JSON_BODY + assert gunzip_payload(_gzip(JSON_BODY)) == JSON_BODY def test_returns_bytes(self): - assert isinstance(ungzip_payload(JSON_BODY), bytes) - assert isinstance(ungzip_payload(_gzip(JSON_BODY)), bytes) + assert isinstance(gunzip_payload(JSON_BODY), bytes) + assert isinstance(gunzip_payload(_gzip(JSON_BODY)), bytes) def test_empty_input(self): - assert ungzip_payload(b"") == b"" + assert gunzip_payload(b"") == b"" def test_short_input_below_magic_length(self): - assert ungzip_payload(b"ab") == b"ab" + assert gunzip_payload(b"ab") == b"ab" def test_truncated_gzip_with_magic_raises(self): bad = GZIP_MAGIC + b"\x00\x00\x00" - with pytest.raises(WebhookSignatureError) as exc_info: - ungzip_payload(bad) - assert "decompress" in str(exc_info.value).lower() + with pytest.raises(InvalidWebhookError) as exc_info: + gunzip_payload(bad) + assert str(exc_info.value) == INVALID_WEBHOOK_GZIP_FAILED class TestDecodeSqsPayload: @@ -110,9 +116,9 @@ def test_accepts_bytes_input(self): assert decode_sqs_payload(encoded) == JSON_BODY def test_invalid_base64_raises(self): - with pytest.raises(WebhookSignatureError) as exc_info: + with pytest.raises(InvalidWebhookError) as exc_info: decode_sqs_payload("!!!not-valid-base64!!!") - assert "base64" in str(exc_info.value).lower() + assert str(exc_info.value) == INVALID_WEBHOOK_INVALID_BASE64 def _sns_envelope(inner_message: str) -> str: @@ -178,7 +184,7 @@ def test_non_ascii_bytes_signature_returns_false(self): assert verify_signature(JSON_BODY, b"\xff" * 32, API_SECRET) is False def test_non_ascii_str_signature_returns_false(self): - assert verify_signature(JSON_BODY, "\u2603" * 64, API_SECRET) is False + assert verify_signature(JSON_BODY, "☃" * 64, API_SECRET) is False def test_non_string_signature_returns_false(self): assert verify_signature(JSON_BODY, 12345, API_SECRET) is False # type: ignore[arg-type] @@ -196,8 +202,9 @@ def test_unknown_event_type_still_parses(self): assert parse_event(body) == {"type": "a.future.event", "custom": 42} def test_malformed_json_raises(self): - with pytest.raises(json.JSONDecodeError): + with pytest.raises(InvalidWebhookError) as exc_info: parse_event(b"not json") + assert str(exc_info.value) == INVALID_WEBHOOK_INVALID_JSON class TestVerifyAndParseWebhook: @@ -215,19 +222,19 @@ def test_returns_dict(self): assert isinstance(result, dict) def test_signature_mismatch_raises(self): - with pytest.raises(WebhookSignatureError) as exc_info: + with pytest.raises(InvalidWebhookError) as exc_info: verify_and_parse_webhook(JSON_BODY, "0" * 64, API_SECRET) - assert "invalid webhook signature" in str(exc_info.value).lower() + assert str(exc_info.value) == INVALID_WEBHOOK_SIGNATURE_MISMATCH def test_signature_must_be_over_uncompressed_bytes(self): compressed = _gzip(JSON_BODY) sig_over_compressed = _sign(compressed) - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError): verify_and_parse_webhook(compressed, sig_over_compressed, API_SECRET) def test_wrong_secret_raises(self): sig = _sign(JSON_BODY, secret="other") - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError): verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) def test_signature_can_be_bytes(self): @@ -235,10 +242,10 @@ def test_signature_can_be_bytes(self): assert verify_and_parse_webhook(JSON_BODY, sig, API_SECRET) == EVENT_DICT def test_malformed_signature_surfaces_as_webhook_error(self): - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError): verify_and_parse_webhook(JSON_BODY, b"\xff" * 32, API_SECRET) - with pytest.raises(WebhookSignatureError): - verify_and_parse_webhook(JSON_BODY, "\u2603" * 64, API_SECRET) + with pytest.raises(InvalidWebhookError): + verify_and_parse_webhook(JSON_BODY, "☃" * 64, API_SECRET) class TestParseSqs: @@ -280,7 +287,7 @@ def test_parse_sns(self, sync_client: StreamChat): assert sync_client.parse_sns(wrapped) == EVENT_DICT def test_signature_mismatch_via_client(self, sync_client: StreamChat): - with pytest.raises(WebhookSignatureError): + with pytest.raises(InvalidWebhookError): sync_client.verify_and_parse_webhook(JSON_BODY, "0" * 64) diff --git a/stream_chat/webhook.py b/stream_chat/webhook.py index 40440c14..032aa683 100644 --- a/stream_chat/webhook.py +++ b/stream_chat/webhook.py @@ -20,7 +20,13 @@ import json from typing import Any, Dict, Optional, Union -from stream_chat.base.exceptions import WebhookSignatureError +from stream_chat.base.exceptions import ( + INVALID_WEBHOOK_GZIP_FAILED, + INVALID_WEBHOOK_INVALID_BASE64, + INVALID_WEBHOOK_INVALID_JSON, + INVALID_WEBHOOK_SIGNATURE_MISMATCH, + InvalidWebhookError, +) GZIP_MAGIC = b"\x1f\x8b" @@ -35,7 +41,7 @@ def _to_bytes(body: _BytesLike) -> bytes: raise TypeError(f"webhook body must be bytes or str, got {type(body).__name__}") -def ungzip_payload(body: _BytesLike) -> bytes: +def gunzip_payload(body: _BytesLike) -> bytes: """Return ``body`` unchanged unless it starts with the gzip magic (``1f 8b``, per RFC 1952), in which case the gzip stream is decompressed. @@ -49,7 +55,7 @@ def ungzip_payload(body: _BytesLike) -> bytes: try: return gzip.decompress(raw) except (gzip.BadGzipFile, OSError, EOFError) as exc: - raise WebhookSignatureError(f"failed to decompress gzip payload: {exc}") + raise InvalidWebhookError(INVALID_WEBHOOK_GZIP_FAILED) from exc def decode_sqs_payload(body: _BytesLike) -> bytes: @@ -65,8 +71,8 @@ def decode_sqs_payload(body: _BytesLike) -> bytes: try: decoded = base64.b64decode(raw, validate=True) except ValueError as exc: - raise WebhookSignatureError(f"failed to base64-decode payload: {exc}") - return ungzip_payload(decoded) + raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_BASE64) from exc + return gunzip_payload(decoded) def decode_sns_payload(notification_body: _BytesLike) -> bytes: @@ -139,9 +145,12 @@ def parse_event(payload: _BytesLike) -> Dict[str, Any]: documented primitive so callers can swap in a typed parser later without changing call sites. """ - if isinstance(payload, (bytes, bytearray, memoryview)): - return json.loads(bytes(payload)) - return json.loads(payload) + try: + if isinstance(payload, (bytes, bytearray, memoryview)): + return json.loads(bytes(payload)) + return json.loads(payload) + except (json.JSONDecodeError, ValueError) as exc: + raise InvalidWebhookError(INVALID_WEBHOOK_INVALID_JSON) from exc def _verify_and_parse( @@ -150,7 +159,7 @@ def _verify_and_parse( secret: str, ) -> Dict[str, Any]: if not verify_signature(payload_bytes, signature, secret): - raise WebhookSignatureError("invalid webhook signature") + raise InvalidWebhookError(INVALID_WEBHOOK_SIGNATURE_MISMATCH) return parse_event(payload_bytes) @@ -165,9 +174,9 @@ def verify_and_parse_webhook( :param body: raw HTTP request body bytes Stream signed :param signature: ``X-Signature`` header value :param secret: the app's API secret - :raises WebhookSignatureError: on signature mismatch or decode error + :raises InvalidWebhookError: on signature mismatch or decode error """ - inflated = ungzip_payload(body) + inflated = gunzip_payload(body) return _verify_and_parse(inflated, signature, secret)