Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 72 additions & 16 deletions stream_chat/tests/async_chat/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import sys
import uuid
from typing import Dict, List

Expand All @@ -26,13 +27,66 @@ def pytest_configure(config):
config.addinivalue_line("markers", "incremental: mark test incremental")


def _warn_cleanup_failure(label: str, identifier: str, exc: BaseException) -> None:
"""See sync conftest for rationale; mirrored to keep both suites consistent."""
print(
f"[cleanup] {label} {identifier} failed: {exc.__class__.__name__}: {exc}",
file=sys.stderr,
)


@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()


@pytest.fixture(scope="session", autouse=True)
def _purge_stale_test_channels_async(event_loop):
"""Async mirror of the sync session sweep; queries channels tagged
``{"test": True, "language": "python"}`` and hard-deletes them at
session start and end so leaks don't compound across runs."""

async def sweep() -> None:
base_url = os.environ.get("STREAM_HOST")
options = {"base_url": base_url} if base_url else {}
async with StreamChatAsync(
api_key=os.environ["STREAM_KEY"],
api_secret=os.environ["STREAM_SECRET"],
timeout=10,
**options,
) as client:
try:
response = await client.query_channels(
{"test": True, "language": "python"},
sort=[{"field": "created_at", "direction": -1}],
limit=30,
)
except Exception as exc:
print(
f"[cleanup] sweep query_channels failed: "
f"{exc.__class__.__name__}: {exc}",
file=sys.stderr,
)
return
cids = [c["channel"]["cid"] for c in response.get("channels", [])]
if not cids:
return
try:
await client.delete_channels(cids, hard_delete=True)
print(
f"[cleanup] swept {len(cids)} leaked test channels",
file=sys.stderr,
)
except Exception as exc:
_warn_cleanup_failure("sweep delete_channels", str(len(cids)), exc)

event_loop.run_until_complete(sweep())
yield
event_loop.run_until_complete(sweep())


@pytest.fixture(scope="function", autouse=True)
@pytest.mark.asyncio
async def client():
Expand Down Expand Up @@ -85,10 +139,12 @@ async def channel(client: StreamChatAsync, random_user: Dict):
await channel.create(random_user["id"])
yield channel

# Synchronous channel.delete (HTTP DELETE), not the async-task
# delete_channels — see sync conftest for the leak rationale.
try:
await client.delete_channels([channel.cid], hard_delete=True)
except Exception:
pass
await channel.delete(hard=True)
except Exception as exc:
_warn_cleanup_failure("channel", channel.cid, exc)


@pytest.fixture(scope="function")
Expand All @@ -107,10 +163,10 @@ async def command(client: StreamChatAsync):
):
try:
await client.delete_command(cmd["name"])
except Exception:
pass
except Exception:
pass
except Exception as exc:
_warn_cleanup_failure("stale command", cmd["name"], exc)
except Exception as exc:
_warn_cleanup_failure("list_commands", "<sweep>", exc)

response = await client.create_command(
dict(name=str(uuid.uuid4()), description="My command")
Expand All @@ -120,8 +176,8 @@ async def command(client: StreamChatAsync):

try:
await client.delete_command(response["command"]["name"])
except Exception:
pass
except Exception as exc:
_warn_cleanup_failure("command", response["command"]["name"], exc)


@pytest.fixture(scope="function")
Expand All @@ -145,8 +201,8 @@ async def fellowship_of_the_ring(client: StreamChatAsync):
]
try:
await client.restore_users([m["id"] for m in members])
except Exception:
pass
except Exception as exc:
_warn_cleanup_failure("restore_users", "fellowship", exc)
await client.upsert_users(members)
channel = client.channel(
"team", "fellowship-of-the-ring", {"members": [m["id"] for m in members]}
Expand All @@ -155,15 +211,15 @@ async def fellowship_of_the_ring(client: StreamChatAsync):
yield
try:
await channel.delete(hard=True)
await hard_delete_users(client, [m["id"] for m in members])
except Exception:
pass
except Exception as exc:
_warn_cleanup_failure("channel", channel.cid, exc)
await hard_delete_users(client, [m["id"] for m in members])


async def hard_delete_users(client: StreamChatAsync, user_ids: List[str]):
try:
await client.delete_users(
user_ids, "hard", conversations="hard", messages="hard"
)
except Exception:
pass
except Exception as exc:
_warn_cleanup_failure("delete_users", ",".join(user_ids), exc)
133 changes: 74 additions & 59 deletions stream_chat/tests/async_chat/test_draft.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,65 +84,80 @@ async def test_query_drafts(
draft1 = {"text": "Draft in channel 1"}
await channel.create_draft(draft1, random_user["id"])

# Create another channel with a draft
channel2 = client.channel("messaging", str(uuid.uuid4()))
await channel2.create(random_user["id"])

draft2 = {"text": "Draft in channel 2"}
await channel2.create_draft(draft2, random_user["id"])

# Query all drafts for the user
response = await client.query_drafts(random_user["id"])

assert "drafts" in response
assert len(response["drafts"]) == 2

# Query drafts for a specific channel
response = await client.query_drafts(
random_user["id"], filter={"channel_cid": channel2.cid}
# Create another channel with a draft. Tag with {"test": True,
# "language": "python"} so the session-level GC sweep in conftest
# catches it if cleanup at the end of this test doesn't run.
channel2 = client.channel(
"messaging",
str(uuid.uuid4()),
{"test": True, "language": "python"},
)
await channel2.create(random_user["id"])

assert "drafts" in response
assert len(response["drafts"]) == 1
draft = response["drafts"][0]
assert draft["channel_cid"] == channel2.cid
assert draft["message"]["text"] == "Draft in channel 2"

# Query drafts with sort
response = await client.query_drafts(
random_user["id"],
sort=[{"field": "created_at", "direction": SortOrder.ASC}],
)

assert "drafts" in response
assert len(response["drafts"]) == 2
assert response["drafts"][0]["channel_cid"] == channel.cid
assert response["drafts"][1]["channel_cid"] == channel2.cid

# Query drafts with pagination
response = await client.query_drafts(
random_user["id"],
options={"limit": 1},
)

assert "drafts" in response
assert len(response["drafts"]) == 1
assert response["drafts"][0]["channel_cid"] == channel2.cid

assert response["next"] is not None

# Query drafts with pagination
response = await client.query_drafts(
random_user["id"],
options={"limit": 1, "next": response["next"]},
)

assert "drafts" in response
assert len(response["drafts"]) == 1
assert response["drafts"][0]["channel_cid"] == channel.cid

# Cleanup
try:
await channel2.delete()
except Exception:
pass
draft2 = {"text": "Draft in channel 2"}
await channel2.create_draft(draft2, random_user["id"])

# Query all drafts for the user
response = await client.query_drafts(random_user["id"])

assert "drafts" in response
assert len(response["drafts"]) == 2

# Query drafts for a specific channel
response = await client.query_drafts(
random_user["id"], filter={"channel_cid": channel2.cid}
)

assert "drafts" in response
assert len(response["drafts"]) == 1
draft = response["drafts"][0]
assert draft["channel_cid"] == channel2.cid
assert draft["message"]["text"] == "Draft in channel 2"

# Query drafts with sort
response = await client.query_drafts(
random_user["id"],
sort=[{"field": "created_at", "direction": SortOrder.ASC}],
)

assert "drafts" in response
assert len(response["drafts"]) == 2
assert response["drafts"][0]["channel_cid"] == channel.cid
assert response["drafts"][1]["channel_cid"] == channel2.cid

# Query drafts with pagination
response = await client.query_drafts(
random_user["id"],
options={"limit": 1},
)

assert "drafts" in response
assert len(response["drafts"]) == 1
assert response["drafts"][0]["channel_cid"] == channel2.cid

assert response["next"] is not None

# Query drafts with pagination
response = await client.query_drafts(
random_user["id"],
options={"limit": 1, "next": response["next"]},
)

assert "drafts" in response
assert len(response["drafts"]) == 1
assert response["drafts"][0]["channel_cid"] == channel.cid
finally:
# Hard-delete via the synchronous channel.delete (HTTP DELETE)
# rather than the async-task delete_channels helper. Failures
# surface in CI logs instead of being swallowed silently.
try:
await channel2.delete(hard=True)
except Exception as exc:
import sys

print(
f"[cleanup] channel {channel2.cid} delete failed: "
f"{exc.__class__.__name__}: {exc}",
file=sys.stderr,
)
Loading
Loading