Skip to content

Commit a1b0df7

Browse files
committed
test: address review findings in polling helpers
1 parent dc659db commit a1b0df7

2 files changed

Lines changed: 19 additions & 63 deletions

File tree

tests/integration/_utils.py

Lines changed: 14 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import asyncio
44
import inspect
5-
import logging
65
import secrets
76
import string
87
import time
@@ -15,8 +14,6 @@
1514
if TYPE_CHECKING:
1615
from collections.abc import Awaitable, Callable
1716

18-
logger = logging.getLogger(__name__)
19-
2017
# Environment variable names for test configuration
2118
TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
2219
TOKEN_ENV_VAR_2 = 'APIFY_TEST_USER_2_API_TOKEN'
@@ -104,7 +101,7 @@ async def maybe_await(value: Awaitable[T] | T) -> T:
104101
"""
105102
if inspect.isawaitable(value):
106103
return await cast('Awaitable[T]', value)
107-
return cast('T', value)
104+
return value
108105

109106

110107
async def maybe_sleep(seconds: float, *, is_async: bool) -> None:
@@ -115,53 +112,6 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None:
115112
time.sleep(seconds) # noqa: ASYNC251
116113

117114

118-
@overload
119-
async def call_with_exp_backoff(
120-
fn: Callable[[], Awaitable[T]],
121-
condition: Callable[[T], bool] = ...,
122-
*,
123-
max_retries: int = ...,
124-
base_delay: float = ...,
125-
) -> T: ...
126-
@overload
127-
async def call_with_exp_backoff(
128-
fn: Callable[[], T],
129-
condition: Callable[[T], bool] = ...,
130-
*,
131-
max_retries: int = ...,
132-
base_delay: float = ...,
133-
) -> T: ...
134-
async def call_with_exp_backoff(
135-
fn: Callable[[], Awaitable[T] | T],
136-
condition: Callable[[T], bool] = bool,
137-
*,
138-
max_retries: int = 5,
139-
base_delay: float = 1.0,
140-
) -> T:
141-
"""Call `fn`, retrying with exponential backoff until `condition(result)` is True.
142-
143-
Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to
144-
`max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is
145-
returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion.
146-
147-
This is useful for eventually-consistent APIs where a freshly created resource may take a moment to become
148-
visible. The default condition checks for a truthy result. Pass `max_retries=0` to call `fn` exactly once.
149-
150-
Unlike `poll_until_condition`, the delay between attempts grows exponentially rather than staying constant.
151-
"""
152-
result = await maybe_await(fn())
153-
for attempt in range(max_retries):
154-
if condition(result):
155-
return result
156-
delay = base_delay * 2**attempt
157-
logger.info(
158-
'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries
159-
)
160-
await asyncio.sleep(delay)
161-
result = await maybe_await(fn())
162-
return result
163-
164-
165115
@overload
166116
async def poll_until_condition(
167117
fn: Callable[[], Awaitable[T]],
@@ -192,8 +142,7 @@ async def poll_until_condition(
192142
assertion. The default condition checks for a truthy result.
193143
194144
Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly
195-
created resource appearing in a listing) that may take a variable amount of time to propagate. Unlike
196-
`call_with_exp_backoff`, the interval between polls stays constant.
145+
created resource appearing in a listing) that may take a variable amount of time to propagate.
197146
"""
198147
deadline = time.monotonic() + timeout
199148
result = await maybe_await(fn())
@@ -228,8 +177,8 @@ async def collect_iterate_until_present(
228177
iterator_factory: No-arg callable returning a fresh iterator on each call.
229178
expected_ids: IDs that must all appear in the collected items.
230179
item_type: Asserted to match the runtime type of each yielded item.
231-
is_async: Whether the iterator is async.
232-
max_attempts: Maximum number of polling rounds.
180+
is_async: Whether the iterator is async (and so are sleeps).
181+
max_attempts: Maximum number of polling rounds, guaranteed regardless of how long each drain takes.
233182
interval: Seconds to sleep between attempts.
234183
235184
Returns:
@@ -251,12 +200,16 @@ async def drain() -> list[_HasIdT]:
251200
collected.append(item)
252201
return collected
253202

254-
return await poll_until_condition(
255-
drain,
256-
lambda collected: expected_ids.issubset(item.id for item in collected),
257-
timeout=max_attempts * interval,
258-
poll_interval=interval,
259-
)
203+
# Loop on attempt count rather than `poll_until_condition`'s wall-clock deadline: each drain performs
204+
# paginated HTTP calls, and charging that time against a deadline would shrink the number of retries
205+
# under load — exactly when the eventual-consistency tolerance is needed most.
206+
collected = await drain()
207+
for _ in range(max_attempts - 1):
208+
if expected_ids.issubset(item.id for item in collected):
209+
break
210+
await maybe_sleep(interval, is_async=is_async)
211+
collected = await drain()
212+
return collected
260213

261214

262215
# ============================================================================

tests/integration/test_request_queue.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,10 +567,13 @@ async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAs
567567
# is acknowledged before all three lock writes are visible to subsequent reads, so unlocking
568568
# immediately can see fewer locks than were just acquired. Locked requests are excluded from the
569569
# queue head, so poll `list_head` until none of the locked requests reappear there, rather than
570-
# guessing a fixed sleep.
570+
# guessing a fixed sleep. Note this is a best-effort mitigation: the head read and the unlock may
571+
# be served by different replicas, so convergence of one does not guarantee the other — but it
572+
# narrows the race window enough in practice.
571573
async def all_locks_visible() -> bool:
572574
head = await maybe_await(rq_client.list_head(limit=5))
573-
return isinstance(head, RequestQueueHead) and locked_ids.isdisjoint(item.id for item in head.items)
575+
assert isinstance(head, RequestQueueHead)
576+
return locked_ids.isdisjoint(item.id for item in head.items)
574577

575578
await poll_until_condition(all_locks_visible, timeout=30, poll_interval=1)
576579

0 commit comments

Comments
 (0)