From 9f00fcdf57b4d8ca451d01b610f8f1c1597ff750 Mon Sep 17 00:00:00 2001 From: nijeeshjoshy Date: Wed, 13 May 2026 13:45:58 +0200 Subject: [PATCH] chore(tests): stop leaking channels on cleanup failure The CI test suite creates ~94 channels per run (~50 sync + ~44 async) across fixtures and explicit cases, but cleanup historically ran through three escape hatches that silently leaked channels into the shared test app: 1. channel fixture used client.delete_channels([cid], hard_delete=True), which returns a task_id immediately. When the task queue is backed up (the same backend slowness that breaks test_delete_channels) the fixture says 'done' before the delete actually fires. 2. Every cleanup path swallowed exceptions: except Exception: pass. Failures invisible. 3. test_query_drafts (sync + async) created a second channel, channel2, and did not delete it on the failure path. Stale channels drift across runs and eventually break unrelated tests that query the shared app expecting clean state. Three fixes: * Swap client.delete_channels for the synchronous channel.delete(hard=True) HTTP DELETE in every fixture. No task_id race; the channel is gone before the fixture returns. * Log cleanup exceptions to stderr ('[cleanup] channel failed: ...'). Visible in CI logs, doesn't fail the test. * Add a session-scoped, autouse=True sweep that queries channels tagged {'test': True, 'language': 'python'} at start and end of the run and hard-deletes whatever is still hanging around. Self-healing across runs; tag test_query_drafts's channel2 so the sweep catches it too. --- stream_chat/tests/async_chat/conftest.py | 88 +++++++++++--- stream_chat/tests/async_chat/test_draft.py | 133 ++++++++++++--------- stream_chat/tests/conftest.py | 102 +++++++++++++--- stream_chat/tests/test_draft.py | 133 ++++++++++++--------- 4 files changed, 306 insertions(+), 150 deletions(-) diff --git a/stream_chat/tests/async_chat/conftest.py b/stream_chat/tests/async_chat/conftest.py index ddb4486..0fa19a1 100644 --- a/stream_chat/tests/async_chat/conftest.py +++ b/stream_chat/tests/async_chat/conftest.py @@ -1,5 +1,6 @@ import asyncio import os +import sys import uuid from typing import Dict, List @@ -26,6 +27,14 @@ def pytest_configure(config): config.addinivalue_line("markers", "incremental: mark test incremental") +def _warn_cleanup_failure(label: str, identifier: str, exc: BaseException) -> None: + """See sync conftest for rationale; mirrored to keep both suites consistent.""" + print( + f"[cleanup] {label} {identifier} failed: {exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + + @pytest.fixture(scope="module") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() @@ -33,6 +42,51 @@ def event_loop(): loop.close() +@pytest.fixture(scope="session", autouse=True) +def _purge_stale_test_channels_async(event_loop): + """Async mirror of the sync session sweep; queries channels tagged + ``{"test": True, "language": "python"}`` and hard-deletes them at + session start and end so leaks don't compound across runs.""" + + async def sweep() -> None: + base_url = os.environ.get("STREAM_HOST") + options = {"base_url": base_url} if base_url else {} + async with StreamChatAsync( + api_key=os.environ["STREAM_KEY"], + api_secret=os.environ["STREAM_SECRET"], + timeout=10, + **options, + ) as client: + try: + response = await client.query_channels( + {"test": True, "language": "python"}, + sort=[{"field": "created_at", "direction": -1}], + limit=30, + ) + except Exception as exc: + print( + f"[cleanup] sweep query_channels failed: " + f"{exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + return + cids = [c["channel"]["cid"] for c in response.get("channels", [])] + if not cids: + return + try: + await client.delete_channels(cids, hard_delete=True) + print( + f"[cleanup] swept {len(cids)} leaked test channels", + file=sys.stderr, + ) + except Exception as exc: + _warn_cleanup_failure("sweep delete_channels", str(len(cids)), exc) + + event_loop.run_until_complete(sweep()) + yield + event_loop.run_until_complete(sweep()) + + @pytest.fixture(scope="function", autouse=True) @pytest.mark.asyncio async def client(): @@ -85,10 +139,12 @@ async def channel(client: StreamChatAsync, random_user: Dict): await channel.create(random_user["id"]) yield channel + # Synchronous channel.delete (HTTP DELETE), not the async-task + # delete_channels — see sync conftest for the leak rationale. try: - await client.delete_channels([channel.cid], hard_delete=True) - except Exception: - pass + await channel.delete(hard=True) + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) @pytest.fixture(scope="function") @@ -107,10 +163,10 @@ async def command(client: StreamChatAsync): ): try: await client.delete_command(cmd["name"]) - except Exception: - pass - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("stale command", cmd["name"], exc) + except Exception as exc: + _warn_cleanup_failure("list_commands", "", exc) response = await client.create_command( dict(name=str(uuid.uuid4()), description="My command") @@ -120,8 +176,8 @@ async def command(client: StreamChatAsync): try: await client.delete_command(response["command"]["name"]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("command", response["command"]["name"], exc) @pytest.fixture(scope="function") @@ -145,8 +201,8 @@ async def fellowship_of_the_ring(client: StreamChatAsync): ] try: await client.restore_users([m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("restore_users", "fellowship", exc) await client.upsert_users(members) channel = client.channel( "team", "fellowship-of-the-ring", {"members": [m["id"] for m in members]} @@ -155,9 +211,9 @@ async def fellowship_of_the_ring(client: StreamChatAsync): yield try: await channel.delete(hard=True) - await hard_delete_users(client, [m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) + await hard_delete_users(client, [m["id"] for m in members]) async def hard_delete_users(client: StreamChatAsync, user_ids: List[str]): @@ -165,5 +221,5 @@ async def hard_delete_users(client: StreamChatAsync, user_ids: List[str]): await client.delete_users( user_ids, "hard", conversations="hard", messages="hard" ) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("delete_users", ",".join(user_ids), exc) diff --git a/stream_chat/tests/async_chat/test_draft.py b/stream_chat/tests/async_chat/test_draft.py index cd8a011..6332ee4 100644 --- a/stream_chat/tests/async_chat/test_draft.py +++ b/stream_chat/tests/async_chat/test_draft.py @@ -84,65 +84,80 @@ async def test_query_drafts( draft1 = {"text": "Draft in channel 1"} await channel.create_draft(draft1, random_user["id"]) - # Create another channel with a draft - channel2 = client.channel("messaging", str(uuid.uuid4())) - await channel2.create(random_user["id"]) - - draft2 = {"text": "Draft in channel 2"} - await channel2.create_draft(draft2, random_user["id"]) - - # Query all drafts for the user - response = await client.query_drafts(random_user["id"]) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - - # Query drafts for a specific channel - response = await client.query_drafts( - random_user["id"], filter={"channel_cid": channel2.cid} + # Create another channel with a draft. Tag with {"test": True, + # "language": "python"} so the session-level GC sweep in conftest + # catches it if cleanup at the end of this test doesn't run. + channel2 = client.channel( + "messaging", + str(uuid.uuid4()), + {"test": True, "language": "python"}, ) + await channel2.create(random_user["id"]) - assert "drafts" in response - assert len(response["drafts"]) == 1 - draft = response["drafts"][0] - assert draft["channel_cid"] == channel2.cid - assert draft["message"]["text"] == "Draft in channel 2" - - # Query drafts with sort - response = await client.query_drafts( - random_user["id"], - sort=[{"field": "created_at", "direction": SortOrder.ASC}], - ) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - assert response["drafts"][0]["channel_cid"] == channel.cid - assert response["drafts"][1]["channel_cid"] == channel2.cid - - # Query drafts with pagination - response = await client.query_drafts( - random_user["id"], - options={"limit": 1}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel2.cid - - assert response["next"] is not None - - # Query drafts with pagination - response = await client.query_drafts( - random_user["id"], - options={"limit": 1, "next": response["next"]}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel.cid - - # Cleanup try: - await channel2.delete() - except Exception: - pass + draft2 = {"text": "Draft in channel 2"} + await channel2.create_draft(draft2, random_user["id"]) + + # Query all drafts for the user + response = await client.query_drafts(random_user["id"]) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + + # Query drafts for a specific channel + response = await client.query_drafts( + random_user["id"], filter={"channel_cid": channel2.cid} + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + draft = response["drafts"][0] + assert draft["channel_cid"] == channel2.cid + assert draft["message"]["text"] == "Draft in channel 2" + + # Query drafts with sort + response = await client.query_drafts( + random_user["id"], + sort=[{"field": "created_at", "direction": SortOrder.ASC}], + ) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + assert response["drafts"][0]["channel_cid"] == channel.cid + assert response["drafts"][1]["channel_cid"] == channel2.cid + + # Query drafts with pagination + response = await client.query_drafts( + random_user["id"], + options={"limit": 1}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel2.cid + + assert response["next"] is not None + + # Query drafts with pagination + response = await client.query_drafts( + random_user["id"], + options={"limit": 1, "next": response["next"]}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel.cid + finally: + # Hard-delete via the synchronous channel.delete (HTTP DELETE) + # rather than the async-task delete_channels helper. Failures + # surface in CI logs instead of being swallowed silently. + try: + await channel2.delete(hard=True) + except Exception as exc: + import sys + + print( + f"[cleanup] channel {channel2.cid} delete failed: " + f"{exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) diff --git a/stream_chat/tests/conftest.py b/stream_chat/tests/conftest.py index b18ebda..911ac64 100644 --- a/stream_chat/tests/conftest.py +++ b/stream_chat/tests/conftest.py @@ -1,4 +1,5 @@ import os +import sys import uuid from typing import Dict, List @@ -25,6 +26,72 @@ def pytest_configure(config): config.addinivalue_line("markers", "incremental: mark test incremental") +def _warn_cleanup_failure(label: str, identifier: str, exc: BaseException) -> None: + """Surface cleanup failures in CI logs. + + Fixtures used to swallow every teardown exception with ``except Exception: pass``, + which silently leaked test channels / users / commands into the shared CI + app and eventually broke unrelated tests with stale-state assertions. We + can't fail the test on cleanup error (the test itself already passed) but + we can at least make the leak visible so the on-call dev knows where to + look. + """ + print( + f"[cleanup] {label} {identifier} failed: {exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + + +@pytest.fixture(scope="session", autouse=True) +def _purge_stale_test_channels(): + """Best-effort sweep of leaked test channels at session start AND end. + + Per-test fixtures tear down with synchronous ``channel.delete(hard=True)``, + but historical runs that aborted mid-test left orphans in the shared CI + app. This sweep targets channels tagged ``{"test": True, "language": + "python"}`` by the ``channel`` fixture below — anything that isn't + actively in use by another concurrent run gets hard-deleted. + + Session-scoped so it runs once per ``pytest`` invocation. autouse=True so + even tests that don't request a channel still benefit (and the next run's + quotas are healthy). + """ + + def sweep(client: StreamChat) -> None: + try: + response = client.query_channels( + {"test": True, "language": "python"}, + sort=[{"field": "created_at", "direction": -1}], + limit=30, + ) + except Exception as exc: + print( + f"[cleanup] sweep query_channels failed: {exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + return + cids = [c["channel"]["cid"] for c in response.get("channels", [])] + if not cids: + return + try: + client.delete_channels(cids, hard_delete=True) + print(f"[cleanup] swept {len(cids)} leaked test channels", file=sys.stderr) + except Exception as exc: + _warn_cleanup_failure("sweep delete_channels", str(len(cids)), exc) + + base_url = os.environ.get("STREAM_HOST") + options = {"base_url": base_url} if base_url else {} + client = StreamChat( + api_key=os.environ["STREAM_KEY"], + api_secret=os.environ["STREAM_SECRET"], + timeout=10, + **options, + ) + sweep(client) + yield + sweep(client) + + @pytest.fixture(scope="module") def client(): base_url = os.environ.get("STREAM_HOST") @@ -76,10 +143,13 @@ def channel(client: StreamChat, random_user: Dict): yield channel + # Use the synchronous channel.delete (HTTP DELETE) instead of + # client.delete_channels (returns task_id, races with subsequent tests + # querying the same app). Tag failures so CI logs show the leak source. try: - client.delete_channels([channel.cid], hard_delete=True) - except Exception: - pass + channel.delete(hard=True) + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) @pytest.fixture(scope="function") @@ -98,10 +168,10 @@ def command(client: StreamChat): ): try: client.delete_command(cmd["name"]) - except Exception: - pass - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("stale command", cmd["name"], exc) + except Exception as exc: + _warn_cleanup_failure("list_commands", "", exc) response = client.create_command( dict(name=str(uuid.uuid4()), description="My command") @@ -111,8 +181,8 @@ def command(client: StreamChat): try: client.delete_command(response["command"]["name"]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("command", response["command"]["name"], exc) @pytest.fixture(scope="module") @@ -135,8 +205,8 @@ def fellowship_of_the_ring(client: StreamChat): ] try: client.restore_users([m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("restore_users", "fellowship", exc) client.upsert_users(members) channel = client.channel( "team", "fellowship-of-the-ring", {"members": [m["id"] for m in members]} @@ -147,13 +217,13 @@ def fellowship_of_the_ring(client: StreamChat): try: channel.delete(hard=True) - hard_delete_users(client, [m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) + hard_delete_users(client, [m["id"] for m in members]) def hard_delete_users(client: StreamChat, user_ids: List[str]): try: client.delete_users(user_ids, "hard", conversations="hard", messages="hard") - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("delete_users", ",".join(user_ids), exc) diff --git a/stream_chat/tests/test_draft.py b/stream_chat/tests/test_draft.py index 5d0d988..9aa299c 100644 --- a/stream_chat/tests/test_draft.py +++ b/stream_chat/tests/test_draft.py @@ -84,65 +84,80 @@ def test_query_drafts( draft1 = {"text": "Draft in channel 1"} channel.create_draft(draft1, random_user["id"]) - # Create another channel with a draft - channel2 = client.channel("messaging", str(uuid.uuid4())) - channel2.create(random_user["id"]) - - draft2 = {"text": "Draft in channel 2"} - channel2.create_draft(draft2, random_user["id"]) - - # Query all drafts for the user - response = client.query_drafts(random_user["id"]) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - - # Query drafts for a specific channel - response = client.query_drafts( - random_user["id"], filter={"channel_cid": channel2.cid} + # Create another channel with a draft. Tag with {"test": True, + # "language": "python"} so the session-level GC sweep in conftest + # catches it if cleanup at the end of this test doesn't run. + channel2 = client.channel( + "messaging", + str(uuid.uuid4()), + {"test": True, "language": "python"}, ) + channel2.create(random_user["id"]) - assert "drafts" in response - assert len(response["drafts"]) == 1 - draft = response["drafts"][0] - assert draft["channel_cid"] == channel2.cid - assert draft["message"]["text"] == "Draft in channel 2" - - # Query drafts with sort - response = client.query_drafts( - random_user["id"], - sort=[{"field": "created_at", "direction": SortOrder.ASC}], - ) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - assert response["drafts"][0]["channel_cid"] == channel.cid - assert response["drafts"][1]["channel_cid"] == channel2.cid - - # Query drafts with pagination - response = client.query_drafts( - random_user["id"], - options={"limit": 1}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel2.cid - - assert response["next"] is not None - - # Query drafts with pagination - response = client.query_drafts( - random_user["id"], - options={"limit": 1, "next": response["next"]}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel.cid - - # cleanup try: - channel2.delete() - except Exception: - pass + draft2 = {"text": "Draft in channel 2"} + channel2.create_draft(draft2, random_user["id"]) + + # Query all drafts for the user + response = client.query_drafts(random_user["id"]) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + + # Query drafts for a specific channel + response = client.query_drafts( + random_user["id"], filter={"channel_cid": channel2.cid} + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + draft = response["drafts"][0] + assert draft["channel_cid"] == channel2.cid + assert draft["message"]["text"] == "Draft in channel 2" + + # Query drafts with sort + response = client.query_drafts( + random_user["id"], + sort=[{"field": "created_at", "direction": SortOrder.ASC}], + ) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + assert response["drafts"][0]["channel_cid"] == channel.cid + assert response["drafts"][1]["channel_cid"] == channel2.cid + + # Query drafts with pagination + response = client.query_drafts( + random_user["id"], + options={"limit": 1}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel2.cid + + assert response["next"] is not None + + # Query drafts with pagination + response = client.query_drafts( + random_user["id"], + options={"limit": 1, "next": response["next"]}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel.cid + finally: + # Hard-delete via the synchronous channel.delete (HTTP DELETE) + # rather than the async-task delete_channels helper. Failures + # surface in CI logs instead of being swallowed silently. + try: + channel2.delete(hard=True) + except Exception as exc: + import sys + + print( + f"[cleanup] channel {channel2.cid} delete failed: " + f"{exc.__class__.__name__}: {exc}", + file=sys.stderr, + )