-
Notifications
You must be signed in to change notification settings - Fork 24
feat: Implement the is_finished method in the ApifyStorageClient
#1008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -279,7 +279,11 @@ async def is_empty(self) -> bool: | |
| """Specific implementation of this method for the RQ single access mode.""" | ||
| # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is now outdated and misleading. Could you update it? |
||
| await self._ensure_head_is_non_empty() | ||
| return not self._head_requests and not self._requests_in_progress | ||
| return not self._head_requests | ||
|
|
||
| async def is_finished(self) -> bool: | ||
| """Specific implementation of this method for the RQ single access mode.""" | ||
| return await self.is_empty() and not self._requests_in_progress | ||
|
|
||
| async def _ensure_head_is_non_empty(self) -> None: | ||
| """Ensure that the queue head has requests if they are available in the queue.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -651,6 +651,44 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue, rq_poll_time | |
| assert metadata.pending_request_count == 0, f'metadata.pending_request_count={metadata.pending_request_count}' | ||
|
|
||
|
|
||
| async def test_is_empty_and_is_finished(request_queue_apify: RequestQueue, rq_poll_timeout: int) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the most part, this duplicates the existing |
||
| """Test `is_empty` and `is_finished` across the queue lifecycle.""" | ||
|
|
||
| rq = request_queue_apify | ||
| Actor.log.info('Request queue opened') | ||
|
|
||
| # Initially the queue is empty and finished. | ||
| is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) | ||
| is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) | ||
| assert is_empty is True, f'is_empty={is_empty}' | ||
| assert is_finished is True, f'is_finished={is_finished}' | ||
|
|
||
| # After adding a request it is neither empty nor finished. | ||
| await rq.add_request('https://example.com') | ||
| is_empty = await poll_until_condition(rq.is_empty, condition=lambda e: e is False, timeout=rq_poll_timeout) | ||
| is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout) | ||
| assert is_empty is False, f'is_empty={is_empty}' | ||
| assert is_finished is False, f'is_finished={is_finished}' | ||
|
|
||
| # Fetch the request without handling it. | ||
| request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) | ||
| assert request is not None, f'request={request}' | ||
|
|
||
| # The queue is empty, because there is no request available for fetching. | ||
| is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) | ||
| assert is_empty is True, f'is_empty={is_empty}' | ||
| # The queue is not finished, because there is a request being processed. | ||
| is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout) | ||
| assert is_finished is False, f'is_finished={is_finished}' | ||
|
|
||
| # After marking the request as handled the queue is empty and finished again. | ||
| await rq.mark_request_as_handled(request) | ||
| is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) | ||
| is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) | ||
| assert is_empty is True, f'is_empty={is_empty}' | ||
| assert is_finished is True, f'is_finished={is_finished}' | ||
|
|
||
|
|
||
| async def test_large_batch_operations( | ||
| request_queue_apify: RequestQueue, | ||
| rq_poll_timeout: int, | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This relies on
await self._is_empty()is being evaluated first (it calls_list_head, which populatesself._queue_has_locked_requests). A reorder would break the finished detection. Maybe we could add a comment noting the ordering dependency.