diff --git a/stream_chat/tests/async_chat/conftest.py b/stream_chat/tests/async_chat/conftest.py index ddb4486..0fa19a1 100644 --- a/stream_chat/tests/async_chat/conftest.py +++ b/stream_chat/tests/async_chat/conftest.py @@ -1,5 +1,6 @@ import asyncio import os +import sys import uuid from typing import Dict, List @@ -26,6 +27,14 @@ def pytest_configure(config): config.addinivalue_line("markers", "incremental: mark test incremental") +def _warn_cleanup_failure(label: str, identifier: str, exc: BaseException) -> None: + """See sync conftest for rationale; mirrored to keep both suites consistent.""" + print( + f"[cleanup] {label} {identifier} failed: {exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + + @pytest.fixture(scope="module") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() @@ -33,6 +42,51 @@ def event_loop(): loop.close() +@pytest.fixture(scope="session", autouse=True) +def _purge_stale_test_channels_async(event_loop): + """Async mirror of the sync session sweep; queries channels tagged + ``{"test": True, "language": "python"}`` and hard-deletes them at + session start and end so leaks don't compound across runs.""" + + async def sweep() -> None: + base_url = os.environ.get("STREAM_HOST") + options = {"base_url": base_url} if base_url else {} + async with StreamChatAsync( + api_key=os.environ["STREAM_KEY"], + api_secret=os.environ["STREAM_SECRET"], + timeout=10, + **options, + ) as client: + try: + response = await client.query_channels( + {"test": True, "language": "python"}, + sort=[{"field": "created_at", "direction": -1}], + limit=30, + ) + except Exception as exc: + print( + f"[cleanup] sweep query_channels failed: " + f"{exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + return + cids = [c["channel"]["cid"] for c in response.get("channels", [])] + if not cids: + return + try: + await client.delete_channels(cids, hard_delete=True) + print( + f"[cleanup] swept {len(cids)} leaked test channels", + file=sys.stderr, + ) + except Exception as exc: + _warn_cleanup_failure("sweep delete_channels", str(len(cids)), exc) + + event_loop.run_until_complete(sweep()) + yield + event_loop.run_until_complete(sweep()) + + @pytest.fixture(scope="function", autouse=True) @pytest.mark.asyncio async def client(): @@ -85,10 +139,12 @@ async def channel(client: StreamChatAsync, random_user: Dict): await channel.create(random_user["id"]) yield channel + # Synchronous channel.delete (HTTP DELETE), not the async-task + # delete_channels — see sync conftest for the leak rationale. try: - await client.delete_channels([channel.cid], hard_delete=True) - except Exception: - pass + await channel.delete(hard=True) + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) @pytest.fixture(scope="function") @@ -107,10 +163,10 @@ async def command(client: StreamChatAsync): ): try: await client.delete_command(cmd["name"]) - except Exception: - pass - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("stale command", cmd["name"], exc) + except Exception as exc: + _warn_cleanup_failure("list_commands", "", exc) response = await client.create_command( dict(name=str(uuid.uuid4()), description="My command") @@ -120,8 +176,8 @@ async def command(client: StreamChatAsync): try: await client.delete_command(response["command"]["name"]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("command", response["command"]["name"], exc) @pytest.fixture(scope="function") @@ -145,8 +201,8 @@ async def fellowship_of_the_ring(client: StreamChatAsync): ] try: await client.restore_users([m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("restore_users", "fellowship", exc) await client.upsert_users(members) channel = client.channel( "team", "fellowship-of-the-ring", {"members": [m["id"] for m in members]} @@ -155,9 +211,9 @@ async def fellowship_of_the_ring(client: StreamChatAsync): yield try: await channel.delete(hard=True) - await hard_delete_users(client, [m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) + await hard_delete_users(client, [m["id"] for m in members]) async def hard_delete_users(client: StreamChatAsync, user_ids: List[str]): @@ -165,5 +221,5 @@ async def hard_delete_users(client: StreamChatAsync, user_ids: List[str]): await client.delete_users( user_ids, "hard", conversations="hard", messages="hard" ) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("delete_users", ",".join(user_ids), exc) diff --git a/stream_chat/tests/async_chat/test_draft.py b/stream_chat/tests/async_chat/test_draft.py index cd8a011..6332ee4 100644 --- a/stream_chat/tests/async_chat/test_draft.py +++ b/stream_chat/tests/async_chat/test_draft.py @@ -84,65 +84,80 @@ async def test_query_drafts( draft1 = {"text": "Draft in channel 1"} await channel.create_draft(draft1, random_user["id"]) - # Create another channel with a draft - channel2 = client.channel("messaging", str(uuid.uuid4())) - await channel2.create(random_user["id"]) - - draft2 = {"text": "Draft in channel 2"} - await channel2.create_draft(draft2, random_user["id"]) - - # Query all drafts for the user - response = await client.query_drafts(random_user["id"]) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - - # Query drafts for a specific channel - response = await client.query_drafts( - random_user["id"], filter={"channel_cid": channel2.cid} + # Create another channel with a draft. Tag with {"test": True, + # "language": "python"} so the session-level GC sweep in conftest + # catches it if cleanup at the end of this test doesn't run. + channel2 = client.channel( + "messaging", + str(uuid.uuid4()), + {"test": True, "language": "python"}, ) + await channel2.create(random_user["id"]) - assert "drafts" in response - assert len(response["drafts"]) == 1 - draft = response["drafts"][0] - assert draft["channel_cid"] == channel2.cid - assert draft["message"]["text"] == "Draft in channel 2" - - # Query drafts with sort - response = await client.query_drafts( - random_user["id"], - sort=[{"field": "created_at", "direction": SortOrder.ASC}], - ) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - assert response["drafts"][0]["channel_cid"] == channel.cid - assert response["drafts"][1]["channel_cid"] == channel2.cid - - # Query drafts with pagination - response = await client.query_drafts( - random_user["id"], - options={"limit": 1}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel2.cid - - assert response["next"] is not None - - # Query drafts with pagination - response = await client.query_drafts( - random_user["id"], - options={"limit": 1, "next": response["next"]}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel.cid - - # Cleanup try: - await channel2.delete() - except Exception: - pass + draft2 = {"text": "Draft in channel 2"} + await channel2.create_draft(draft2, random_user["id"]) + + # Query all drafts for the user + response = await client.query_drafts(random_user["id"]) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + + # Query drafts for a specific channel + response = await client.query_drafts( + random_user["id"], filter={"channel_cid": channel2.cid} + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + draft = response["drafts"][0] + assert draft["channel_cid"] == channel2.cid + assert draft["message"]["text"] == "Draft in channel 2" + + # Query drafts with sort + response = await client.query_drafts( + random_user["id"], + sort=[{"field": "created_at", "direction": SortOrder.ASC}], + ) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + assert response["drafts"][0]["channel_cid"] == channel.cid + assert response["drafts"][1]["channel_cid"] == channel2.cid + + # Query drafts with pagination + response = await client.query_drafts( + random_user["id"], + options={"limit": 1}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel2.cid + + assert response["next"] is not None + + # Query drafts with pagination + response = await client.query_drafts( + random_user["id"], + options={"limit": 1, "next": response["next"]}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel.cid + finally: + # Hard-delete via the synchronous channel.delete (HTTP DELETE) + # rather than the async-task delete_channels helper. Failures + # surface in CI logs instead of being swallowed silently. + try: + await channel2.delete(hard=True) + except Exception as exc: + import sys + + print( + f"[cleanup] channel {channel2.cid} delete failed: " + f"{exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) diff --git a/stream_chat/tests/conftest.py b/stream_chat/tests/conftest.py index b18ebda..911ac64 100644 --- a/stream_chat/tests/conftest.py +++ b/stream_chat/tests/conftest.py @@ -1,4 +1,5 @@ import os +import sys import uuid from typing import Dict, List @@ -25,6 +26,72 @@ def pytest_configure(config): config.addinivalue_line("markers", "incremental: mark test incremental") +def _warn_cleanup_failure(label: str, identifier: str, exc: BaseException) -> None: + """Surface cleanup failures in CI logs. + + Fixtures used to swallow every teardown exception with ``except Exception: pass``, + which silently leaked test channels / users / commands into the shared CI + app and eventually broke unrelated tests with stale-state assertions. We + can't fail the test on cleanup error (the test itself already passed) but + we can at least make the leak visible so the on-call dev knows where to + look. + """ + print( + f"[cleanup] {label} {identifier} failed: {exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + + +@pytest.fixture(scope="session", autouse=True) +def _purge_stale_test_channels(): + """Best-effort sweep of leaked test channels at session start AND end. + + Per-test fixtures tear down with synchronous ``channel.delete(hard=True)``, + but historical runs that aborted mid-test left orphans in the shared CI + app. This sweep targets channels tagged ``{"test": True, "language": + "python"}`` by the ``channel`` fixture below — anything that isn't + actively in use by another concurrent run gets hard-deleted. + + Session-scoped so it runs once per ``pytest`` invocation. autouse=True so + even tests that don't request a channel still benefit (and the next run's + quotas are healthy). + """ + + def sweep(client: StreamChat) -> None: + try: + response = client.query_channels( + {"test": True, "language": "python"}, + sort=[{"field": "created_at", "direction": -1}], + limit=30, + ) + except Exception as exc: + print( + f"[cleanup] sweep query_channels failed: {exc.__class__.__name__}: {exc}", + file=sys.stderr, + ) + return + cids = [c["channel"]["cid"] for c in response.get("channels", [])] + if not cids: + return + try: + client.delete_channels(cids, hard_delete=True) + print(f"[cleanup] swept {len(cids)} leaked test channels", file=sys.stderr) + except Exception as exc: + _warn_cleanup_failure("sweep delete_channels", str(len(cids)), exc) + + base_url = os.environ.get("STREAM_HOST") + options = {"base_url": base_url} if base_url else {} + client = StreamChat( + api_key=os.environ["STREAM_KEY"], + api_secret=os.environ["STREAM_SECRET"], + timeout=10, + **options, + ) + sweep(client) + yield + sweep(client) + + @pytest.fixture(scope="module") def client(): base_url = os.environ.get("STREAM_HOST") @@ -76,10 +143,13 @@ def channel(client: StreamChat, random_user: Dict): yield channel + # Use the synchronous channel.delete (HTTP DELETE) instead of + # client.delete_channels (returns task_id, races with subsequent tests + # querying the same app). Tag failures so CI logs show the leak source. try: - client.delete_channels([channel.cid], hard_delete=True) - except Exception: - pass + channel.delete(hard=True) + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) @pytest.fixture(scope="function") @@ -98,10 +168,10 @@ def command(client: StreamChat): ): try: client.delete_command(cmd["name"]) - except Exception: - pass - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("stale command", cmd["name"], exc) + except Exception as exc: + _warn_cleanup_failure("list_commands", "", exc) response = client.create_command( dict(name=str(uuid.uuid4()), description="My command") @@ -111,8 +181,8 @@ def command(client: StreamChat): try: client.delete_command(response["command"]["name"]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("command", response["command"]["name"], exc) @pytest.fixture(scope="module") @@ -135,8 +205,8 @@ def fellowship_of_the_ring(client: StreamChat): ] try: client.restore_users([m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("restore_users", "fellowship", exc) client.upsert_users(members) channel = client.channel( "team", "fellowship-of-the-ring", {"members": [m["id"] for m in members]} @@ -147,13 +217,13 @@ def fellowship_of_the_ring(client: StreamChat): try: channel.delete(hard=True) - hard_delete_users(client, [m["id"] for m in members]) - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("channel", channel.cid, exc) + hard_delete_users(client, [m["id"] for m in members]) def hard_delete_users(client: StreamChat, user_ids: List[str]): try: client.delete_users(user_ids, "hard", conversations="hard", messages="hard") - except Exception: - pass + except Exception as exc: + _warn_cleanup_failure("delete_users", ",".join(user_ids), exc) diff --git a/stream_chat/tests/test_draft.py b/stream_chat/tests/test_draft.py index 5d0d988..9aa299c 100644 --- a/stream_chat/tests/test_draft.py +++ b/stream_chat/tests/test_draft.py @@ -84,65 +84,80 @@ def test_query_drafts( draft1 = {"text": "Draft in channel 1"} channel.create_draft(draft1, random_user["id"]) - # Create another channel with a draft - channel2 = client.channel("messaging", str(uuid.uuid4())) - channel2.create(random_user["id"]) - - draft2 = {"text": "Draft in channel 2"} - channel2.create_draft(draft2, random_user["id"]) - - # Query all drafts for the user - response = client.query_drafts(random_user["id"]) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - - # Query drafts for a specific channel - response = client.query_drafts( - random_user["id"], filter={"channel_cid": channel2.cid} + # Create another channel with a draft. Tag with {"test": True, + # "language": "python"} so the session-level GC sweep in conftest + # catches it if cleanup at the end of this test doesn't run. + channel2 = client.channel( + "messaging", + str(uuid.uuid4()), + {"test": True, "language": "python"}, ) + channel2.create(random_user["id"]) - assert "drafts" in response - assert len(response["drafts"]) == 1 - draft = response["drafts"][0] - assert draft["channel_cid"] == channel2.cid - assert draft["message"]["text"] == "Draft in channel 2" - - # Query drafts with sort - response = client.query_drafts( - random_user["id"], - sort=[{"field": "created_at", "direction": SortOrder.ASC}], - ) - - assert "drafts" in response - assert len(response["drafts"]) == 2 - assert response["drafts"][0]["channel_cid"] == channel.cid - assert response["drafts"][1]["channel_cid"] == channel2.cid - - # Query drafts with pagination - response = client.query_drafts( - random_user["id"], - options={"limit": 1}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel2.cid - - assert response["next"] is not None - - # Query drafts with pagination - response = client.query_drafts( - random_user["id"], - options={"limit": 1, "next": response["next"]}, - ) - - assert "drafts" in response - assert len(response["drafts"]) == 1 - assert response["drafts"][0]["channel_cid"] == channel.cid - - # cleanup try: - channel2.delete() - except Exception: - pass + draft2 = {"text": "Draft in channel 2"} + channel2.create_draft(draft2, random_user["id"]) + + # Query all drafts for the user + response = client.query_drafts(random_user["id"]) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + + # Query drafts for a specific channel + response = client.query_drafts( + random_user["id"], filter={"channel_cid": channel2.cid} + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + draft = response["drafts"][0] + assert draft["channel_cid"] == channel2.cid + assert draft["message"]["text"] == "Draft in channel 2" + + # Query drafts with sort + response = client.query_drafts( + random_user["id"], + sort=[{"field": "created_at", "direction": SortOrder.ASC}], + ) + + assert "drafts" in response + assert len(response["drafts"]) == 2 + assert response["drafts"][0]["channel_cid"] == channel.cid + assert response["drafts"][1]["channel_cid"] == channel2.cid + + # Query drafts with pagination + response = client.query_drafts( + random_user["id"], + options={"limit": 1}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel2.cid + + assert response["next"] is not None + + # Query drafts with pagination + response = client.query_drafts( + random_user["id"], + options={"limit": 1, "next": response["next"]}, + ) + + assert "drafts" in response + assert len(response["drafts"]) == 1 + assert response["drafts"][0]["channel_cid"] == channel.cid + finally: + # Hard-delete via the synchronous channel.delete (HTTP DELETE) + # rather than the async-task delete_channels helper. Failures + # surface in CI logs instead of being swallowed silently. + try: + channel2.delete(hard=True) + except Exception as exc: + import sys + + print( + f"[cleanup] channel {channel2.cid} delete failed: " + f"{exc.__class__.__name__}: {exc}", + file=sys.stderr, + )