Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ apify-client = false
apify_fingerprint_datapoints = false
crawlee = false

[tool.uv.sources]
crawlee = { git = "https://github.com/Mantisus/crawlee-python", branch = "queue-client-is-finished" }

# Run tasks with: uv run poe <task>
[tool.poe.tasks]
clean = "rm -rf .coverage .pytest_cache .ruff_cache .ty_cache build dist htmlcov"
Expand Down
4 changes: 4 additions & 0 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,7 @@ async def reclaim_request(
@override
async def is_empty(self) -> bool:
return await self._implementation.is_empty()

@override
async def is_finished(self) -> bool:
return await self._implementation.is_finished()
13 changes: 11 additions & 2 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,17 @@ async def is_empty(self) -> bool:
# Check _list_head.
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.
async with self._fetch_lock:
head = await self._list_head(limit=1)
return len(head.items) == 0 and not self._queue_has_locked_requests
return await self._is_empty()

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ shared access mode."""
async with self._fetch_lock:
return await self._is_empty() and not self._queue_has_locked_requests

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on await self._is_empty() is being evaluated first (it calls _list_head, which populates self._queue_has_locked_requests). A reorder would break the finished detection. Maybe we could add a comment noting the ordering dependency.


async def _is_empty(self) -> bool:
"""Check whether anything is available to fetch. Lock-free core of `is_empty`, caller must hold the lock."""
head = await self._list_head(limit=1)
return len(head.items) == 0

async def _get_metadata_estimate(self) -> RequestQueueMetadata:
"""Try to get cached metadata first. If multiple clients, fuse with global metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ async def is_empty(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
# Without the lock the `is_empty` is prone to falsely report True with some low probability race condition.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is now outdated and misleading. Could you update it?

await self._ensure_head_is_non_empty()
return not self._head_requests and not self._requests_in_progress
return not self._head_requests

async def is_finished(self) -> bool:
"""Specific implementation of this method for the RQ single access mode."""
return await self.is_empty() and not self._requests_in_progress

async def _ensure_head_is_non_empty(self) -> None:
"""Ensure that the queue head has requests if they are available in the queue."""
Expand Down
38 changes: 38 additions & 0 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,44 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue, rq_poll_time
assert metadata.pending_request_count == 0, f'metadata.pending_request_count={metadata.pending_request_count}'


async def test_is_empty_and_is_finished(request_queue_apify: RequestQueue, rq_poll_timeout: int) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the most part, this duplicates the existing test_request_queue_is_finished test. Could we consolidate them into one? And parametrized it single+shared.

"""Test `is_empty` and `is_finished` across the queue lifecycle."""

rq = request_queue_apify
Actor.log.info('Request queue opened')

# Initially the queue is empty and finished.
is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2)
assert is_empty is True, f'is_empty={is_empty}'
assert is_finished is True, f'is_finished={is_finished}'

# After adding a request it is neither empty nor finished.
await rq.add_request('https://example.com')
is_empty = await poll_until_condition(rq.is_empty, condition=lambda e: e is False, timeout=rq_poll_timeout)
is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout)
assert is_empty is False, f'is_empty={is_empty}'
assert is_finished is False, f'is_finished={is_finished}'

# Fetch the request without handling it.
request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2)
assert request is not None, f'request={request}'

# The queue is empty, because there is no request available for fetching.
is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
assert is_empty is True, f'is_empty={is_empty}'
# The queue is not finished, because there is a request being processed.
is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout)
assert is_finished is False, f'is_finished={is_finished}'

# After marking the request as handled the queue is empty and finished again.
await rq.mark_request_as_handled(request)
is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2)
is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2)
assert is_empty is True, f'is_empty={is_empty}'
assert is_finished is True, f'is_finished={is_finished}'


async def test_large_batch_operations(
request_queue_apify: RequestQueue,
rq_poll_timeout: int,
Expand Down
12 changes: 4 additions & 8 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading