diff --git a/pyproject.toml b/pyproject.toml index c3add790..c8b27e4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -243,6 +243,9 @@ apify-client = false apify_fingerprint_datapoints = false crawlee = false +[tool.uv.sources] +crawlee = { git = "https://github.com/Mantisus/crawlee-python", branch = "queue-client-is-finished" } + # Run tasks with: uv run poe [tool.poe.tasks] clean = "rm -rf .coverage .pytest_cache .ruff_cache .ty_cache build dist htmlcov" diff --git a/src/apify/storage_clients/_apify/_request_queue_client.py b/src/apify/storage_clients/_apify/_request_queue_client.py index de673780..174c0ca3 100644 --- a/src/apify/storage_clients/_apify/_request_queue_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_client.py @@ -198,3 +198,7 @@ async def reclaim_request( @override async def is_empty(self) -> bool: return await self._implementation.is_empty() + + @override + async def is_finished(self) -> bool: + return await self._implementation.is_finished() diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index c196b098..b44f559f 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -292,8 +292,17 @@ async def is_empty(self) -> bool: # Check _list_head. # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. async with self._fetch_lock: - head = await self._list_head(limit=1) - return len(head.items) == 0 and not self._queue_has_locked_requests + return await self._is_empty() + + async def is_finished(self) -> bool: + """Specific implementation of this method for the RQ shared access mode.""" + async with self._fetch_lock: + return await self._is_empty() and not self._queue_has_locked_requests + + async def _is_empty(self) -> bool: + """Check whether anything is available to fetch. Lock-free core of `is_empty`, caller must hold the lock.""" + head = await self._list_head(limit=1) + return len(head.items) == 0 async def _get_metadata_estimate(self) -> RequestQueueMetadata: """Try to get cached metadata first. If multiple clients, fuse with global metadata. diff --git a/src/apify/storage_clients/_apify/_request_queue_single_client.py b/src/apify/storage_clients/_apify/_request_queue_single_client.py index 4029300d..d48a3319 100644 --- a/src/apify/storage_clients/_apify/_request_queue_single_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_single_client.py @@ -279,7 +279,11 @@ async def is_empty(self) -> bool: """Specific implementation of this method for the RQ single access mode.""" # Without the lock the `is_empty` is prone to falsely report True with some low probability race condition. await self._ensure_head_is_non_empty() - return not self._head_requests and not self._requests_in_progress + return not self._head_requests + + async def is_finished(self) -> bool: + """Specific implementation of this method for the RQ single access mode.""" + return await self.is_empty() and not self._requests_in_progress async def _ensure_head_is_non_empty(self) -> None: """Ensure that the queue head has requests if they are available in the queue.""" diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 955d9e81..64791401 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -651,6 +651,44 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue, rq_poll_time assert metadata.pending_request_count == 0, f'metadata.pending_request_count={metadata.pending_request_count}' +async def test_is_empty_and_is_finished(request_queue_apify: RequestQueue, rq_poll_timeout: int) -> None: + """Test `is_empty` and `is_finished` across the queue lifecycle.""" + + rq = request_queue_apify + Actor.log.info('Request queue opened') + + # Initially the queue is empty and finished. + is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) + assert is_empty is True, f'is_empty={is_empty}' + assert is_finished is True, f'is_finished={is_finished}' + + # After adding a request it is neither empty nor finished. + await rq.add_request('https://example.com') + is_empty = await poll_until_condition(rq.is_empty, condition=lambda e: e is False, timeout=rq_poll_timeout) + is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout) + assert is_empty is False, f'is_empty={is_empty}' + assert is_finished is False, f'is_finished={is_finished}' + + # Fetch the request without handling it. + request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) + assert request is not None, f'request={request}' + + # The queue is empty, because there is no request available for fetching. + is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) + assert is_empty is True, f'is_empty={is_empty}' + # The queue is not finished, because there is a request being processed. + is_finished = await poll_until_condition(rq.is_finished, condition=lambda f: f is False, timeout=rq_poll_timeout) + assert is_finished is False, f'is_finished={is_finished}' + + # After marking the request as handled the queue is empty and finished again. + await rq.mark_request_as_handled(request) + is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) + assert is_empty is True, f'is_empty={is_empty}' + assert is_finished is True, f'is_finished={is_finished}' + + async def test_large_batch_operations( request_queue_apify: RequestQueue, rq_poll_timeout: int, diff --git a/uv.lock b/uv.lock index bdcf47c2..e781dce0 100644 --- a/uv.lock +++ b/uv.lock @@ -84,7 +84,7 @@ dev = [ requires-dist = [ { name = "apify-client", specifier = ">=3.0.0,<4.0.0" }, { name = "cachetools", specifier = ">=5.5.0" }, - { name = "crawlee", specifier = ">=1.0.4,<2.0.0" }, + { name = "crawlee", git = "https://github.com/Mantisus/crawlee-python?branch=queue-client-is-finished" }, { name = "cryptography", specifier = ">=42.0.0" }, { name = "impit", specifier = ">=0.8.0" }, { name = "lazy-object-proxy", specifier = ">=1.11.0" }, @@ -101,7 +101,7 @@ provides-extras = ["scrapy"] dev = [ { name = "black", specifier = ">=24.3.0" }, { name = "build", specifier = "<2.0.0" }, - { name = "crawlee", extras = ["parsel"] }, + { name = "crawlee", extras = ["parsel"], git = "https://github.com/Mantisus/crawlee-python?branch=queue-client-is-finished" }, { name = "dycw-pytest-only", specifier = "<3.0.0" }, { name = "griffe" }, { name = "poethepoet", specifier = "<1.0.0" }, @@ -536,8 +536,8 @@ toml = [ [[package]] name = "crawlee" -version = "1.7.2" -source = { registry = "https://pypi.org/simple" } +version = "1.7.3" +source = { git = "https://github.com/Mantisus/crawlee-python?branch=queue-client-is-finished#6433c378b58ffa8d233d3c232b05425bb6ade1a5" } dependencies = [ { name = "async-timeout" }, { name = "cachetools" }, @@ -553,10 +553,6 @@ dependencies = [ { name = "typing-extensions" }, { name = "yarl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/80/19/b2ca54efbdad4770a3889aa788bd9a738a754b8b8889a3f7413cc40f25c3/crawlee-1.7.2.tar.gz", hash = "sha256:362f3f9e43d7c28c8c0133f8787ad2ff77d159ff205b9a72f2013ece882298ac", size = 287966, upload-time = "2026-06-04T08:14:59.459Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/f5/cd/00961c23ce321ba16343ae53cc08e23f088ff7c1ea5365f64cae3c71d349/crawlee-1.7.2-py3-none-any.whl", hash = "sha256:dd22b2c1fa6b39312222ec0e732344b22eb26aa53d33758ec9a1a8ad16725d4d", size = 370707, upload-time = "2026-06-04T08:14:57.574Z" }, -] [package.optional-dependencies] parsel = [