|
4 | 4 | import secrets |
5 | 5 | import string |
6 | 6 | import time |
| 7 | +from collections.abc import AsyncIterator, Iterator |
7 | 8 | from dataclasses import dataclass |
8 | | -from typing import TYPE_CHECKING, Any, TypeVar, overload |
| 9 | +from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload |
9 | 10 |
|
10 | 11 | import pytest |
11 | 12 |
|
12 | 13 | if TYPE_CHECKING: |
13 | | - from collections.abc import Coroutine |
| 14 | + from collections.abc import Callable, Coroutine |
14 | 15 |
|
15 | 16 | # Environment variable names for test configuration |
16 | 17 | TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN' |
|
20 | 21 | T = TypeVar('T') |
21 | 22 |
|
22 | 23 |
|
| 24 | +class _HasId(Protocol): |
| 25 | + """Items returned by collection `iterate()` endpoints all expose `.id`.""" |
| 26 | + |
| 27 | + @property |
| 28 | + def id(self) -> str: ... |
| 29 | + |
| 30 | + |
| 31 | +_HasIdT = TypeVar('_HasIdT', bound=_HasId) |
| 32 | + |
| 33 | + |
23 | 34 | # ============================================================================ |
24 | 35 | # Data classes for test fixtures |
25 | 36 | # ============================================================================ |
@@ -108,6 +119,55 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None: |
108 | 119 | time.sleep(seconds) # noqa: ASYNC251 |
109 | 120 |
|
110 | 121 |
|
| 122 | +async def collect_iterate_until_present( |
| 123 | + iterator_factory: Callable[[], Iterator[_HasIdT] | AsyncIterator[_HasIdT]], |
| 124 | + expected_ids: set[str], |
| 125 | + *, |
| 126 | + item_type: type[_HasIdT], |
| 127 | + is_async: bool, |
| 128 | + max_attempts: int = 5, |
| 129 | + interval: float = 1.0, |
| 130 | +) -> list[_HasIdT]: |
| 131 | + """Drain a collection `iterate()` until every expected ID is present. |
| 132 | +
|
| 133 | + Handles eventual consistency on listing endpoints: under parallel load a freshly |
| 134 | + created resource may not appear in the listing for a short window. Each attempt |
| 135 | + builds a fresh iterator via `iterator_factory`, drains it, and breaks early once |
| 136 | + `expected_ids` is a subset of the collected items' `.id` values. The most recent |
| 137 | + collection is returned regardless of whether the condition was met, so the caller |
| 138 | + can run its own assertion with a helpful failure message. |
| 139 | +
|
| 140 | + Args: |
| 141 | + iterator_factory: No-arg callable returning a fresh iterator on each call. |
| 142 | + expected_ids: IDs that must all appear in the collected items. |
| 143 | + item_type: Asserted to match the runtime type of each yielded item. |
| 144 | + is_async: Whether the iterator is async (and so are sleeps). |
| 145 | + max_attempts: Maximum number of polling rounds. |
| 146 | + interval: Seconds to sleep before each attempt. |
| 147 | +
|
| 148 | + Returns: |
| 149 | + The most recently collected items. |
| 150 | + """ |
| 151 | + collected: list[_HasIdT] = [] |
| 152 | + for _ in range(max_attempts): |
| 153 | + await maybe_sleep(interval, is_async=is_async) |
| 154 | + iterator = iterator_factory() |
| 155 | + collected = [] |
| 156 | + if is_async: |
| 157 | + assert isinstance(iterator, AsyncIterator) |
| 158 | + async for item in iterator: |
| 159 | + assert isinstance(item, item_type) |
| 160 | + collected.append(item) |
| 161 | + else: |
| 162 | + assert isinstance(iterator, Iterator) |
| 163 | + for item in iterator: |
| 164 | + assert isinstance(item, item_type) |
| 165 | + collected.append(item) |
| 166 | + if expected_ids.issubset(item.id for item in collected): |
| 167 | + break |
| 168 | + return collected |
| 169 | + |
| 170 | + |
111 | 171 | # ============================================================================ |
112 | 172 | # Pytest markers and parametrization |
113 | 173 | # ============================================================================ |
|
0 commit comments