From d8ddc933188ecd5de2577c39aba8bb89d7e50450 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Sun, 21 Jun 2026 20:56:32 +0000 Subject: [PATCH] add `is_finished` method to `RequestQueueClient` --- .../_base/_request_queue_client.py | 10 +++- .../_file_system/_request_queue_client.py | 57 +++++-------------- .../_memory/_request_queue_client.py | 14 ++++- .../_redis/_request_queue_client.py | 17 ++++-- .../_sql/_request_queue_client.py | 29 +++++++++- src/crawlee/storages/_request_queue.py | 17 +++--- .../crawlers/_basic/test_basic_crawler.py | 35 ++++++++++-- tests/unit/storages/test_request_queue.py | 20 +++++-- 8 files changed, 126 insertions(+), 73 deletions(-) diff --git a/src/crawlee/storage_clients/_base/_request_queue_client.py b/src/crawlee/storage_clients/_base/_request_queue_client.py index a993fcfdb3..df0fe996e5 100644 --- a/src/crawlee/storage_clients/_base/_request_queue_client.py +++ b/src/crawlee/storage_clients/_base/_request_queue_client.py @@ -124,8 +124,16 @@ async def reclaim_request( @abstractmethod async def is_empty(self) -> bool: - """Check if the request queue is empty. + """Check if the request queue is empty. That means there are no requests available to fetch. Returns: True if the request queue is empty, False otherwise. """ + + async def is_finished(self) -> bool: + """Check if the request queue is finished. That means the queue is empty and no requests are being processed. + + Returns: + True if the request queue is finished, False otherwise. + """ + return await self.is_empty() diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 024a50b8a9..0b5ec6b9e3 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -114,9 +114,6 @@ def __init__( self._request_cache_needs_refresh = True """Flag indicating whether the cache needs to be refreshed from filesystem.""" - self._is_empty_cache: bool | None = None - """Cache for is_empty result: None means unknown, True/False is cached state.""" - self._state = recoverable_state """Recoverable state to maintain request ordering, in-progress status, and handled status.""" @@ -291,9 +288,6 @@ async def drop(self) -> None: self._request_cache.clear() self._request_cache_needs_refresh = True - # Invalidate is_empty cache. - self._is_empty_cache = None - @override async def purge(self) -> None: async with self._lock: @@ -315,9 +309,6 @@ async def purge(self) -> None: new_total_request_count=0, ) - # Invalidate is_empty cache. - self._is_empty_cache = None - @override async def add_batch_of_requests( self, @@ -326,7 +317,6 @@ async def add_batch_of_requests( forefront: bool = False, ) -> AddRequestsResponse: async with self._lock: - self._is_empty_cache = None new_total_request_count = self._metadata.total_request_count new_pending_request_count = self._metadata.pending_request_count processed_requests = list[ProcessedRequest]() @@ -435,9 +425,6 @@ async def add_batch_of_requests( if forefront: self._request_cache_needs_refresh = True - # Invalidate is_empty cache. - self._is_empty_cache = None - return AddRequestsResponse( processed_requests=processed_requests, unprocessed_requests=unprocessed_requests, @@ -482,7 +469,6 @@ async def fetch_next_request(self) -> Request | None: @override async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: async with self._lock: - self._is_empty_cache = None state = self._state.current_value # Check if the request is in progress. @@ -530,7 +516,6 @@ async def reclaim_request( forefront: bool = False, ) -> ProcessedRequest | None: async with self._lock: - self._is_empty_cache = None state = self._state.current_value # Check if the request is in progress. @@ -586,39 +571,23 @@ async def reclaim_request( @override async def is_empty(self) -> bool: async with self._lock: - # If we have a cached value, return it immediately. - if self._is_empty_cache is not None: - return self._is_empty_cache - - state = self._state.current_value - - # If there are in-progress requests, return False immediately. - if len(state.in_progress_requests) > 0: - self._is_empty_cache = False - return False - - # If we have a cached requests, check them first (fast path). - if self._request_cache: - for req in self._request_cache: - if req.unique_key not in state.handled_requests: - self._is_empty_cache = False - return False - self._is_empty_cache = True - return len(state.in_progress_requests) == 0 - - # Fallback: check state for unhandled requests. await self._update_metadata(update_accessed_at=True) - # Check if there are any requests that are not handled - all_requests = set(state.forefront_requests.keys()) | set(state.regular_requests.keys()) - unhandled_requests = all_requests - state.handled_requests + # The queue is empty when nothing is available to fetch, i.e. every unhandled request is + # currently in progress. + return self._metadata.pending_request_count - len(self._state.current_value.in_progress_requests) <= 0 + + @override + async def is_finished(self) -> bool: + # If anything is still available to fetch, the queue is not finished. + if not await self.is_empty(): + return False - if unhandled_requests: - self._is_empty_cache = False - return False + async with self._lock: + await self._update_metadata(update_accessed_at=True) - self._is_empty_cache = True - return True + # The queue is finished when there are no pending requests and no in-progress requests. + return self._metadata.pending_request_count == 0 and not self._state.current_value.in_progress_requests def _get_request_path(self, unique_key: str) -> Path: """Get the path to a specific request file. diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index 90b47c63d8..2bcdd60766 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -314,8 +314,18 @@ async def is_empty(self) -> bool: """ await self._update_metadata(update_accessed_at=True) - # Queue is empty if there are no pending requests and no requests in progress. - return len(self._pending_requests) == 0 and len(self._in_progress_requests) == 0 + # Queue is empty if there are no pending requests. + return len(self._pending_requests) == 0 + + @override + async def is_finished(self) -> bool: + """Check if the queue is finished. + + Returns: + True if the queue is finished, False otherwise. + """ + # Queue is finished if it is empty and there are no in-progress requests. + return await self.is_empty() and len(self._in_progress_requests) == 0 async def _update_metadata( self, diff --git a/src/crawlee/storage_clients/_redis/_request_queue_client.py b/src/crawlee/storage_clients/_redis/_request_queue_client.py index b2defb752e..4cefa87789 100644 --- a/src/crawlee/storage_clients/_redis/_request_queue_client.py +++ b/src/crawlee/storage_clients/_redis/_request_queue_client.py @@ -496,11 +496,7 @@ async def reclaim_request( @retry_on_error(RedisError) @override async def is_empty(self) -> bool: - """Check if the queue is empty. - - Returns: - True if the queue is empty, False otherwise. - """ + # Requests buffered for fetching mean the queue is not empty. if self._pending_fetch_cache: return False @@ -509,9 +505,18 @@ async def is_empty(self) -> bool: await self._reclaim_stale_requests() self._next_reclaim_stale = datetime.now(tz=timezone.utc) + self._RECLAIM_INTERVAL + # Check if there are any requests in the queue. + requests_in_queue = await await_redis_response(self._redis.llen(self._queue_key)) + return requests_in_queue == 0 + + @retry_on_error(RedisError) + @override + async def is_finished(self) -> bool: + is_empty = await self.is_empty() + metadata = await self.get_metadata() - return metadata.pending_request_count == 0 + return is_empty and metadata.pending_request_count == 0 async def _load_scripts(self) -> None: """Ensure Lua scripts are loaded in Redis.""" diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index e3e75bf3c3..99630a3bd8 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -595,10 +595,34 @@ async def reclaim_request( @retry_on_error(SQLAlchemyError) @override async def is_empty(self) -> bool: - # Check in-memory cache for requests + # Requests buffered for fetching mean the queue is not empty. if self._pending_fetch_cache: return False + now = datetime.now(timezone.utc) + + # Check if there are any unhandled requests that are not blocked. + async with self.get_session(with_simple_commit=True) as session: + stmt = select( + exists().where( + self._ITEM_TABLE.request_queue_id == self._id, + self._ITEM_TABLE.is_handled == False, # noqa: E712 + or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now), + ) + ) + result = await session.execute(stmt) + + await self._add_buffer_record(session) + + return not result.scalar() + + @retry_on_error(SQLAlchemyError) + @override + async def is_finished(self) -> bool: + # If the queue is not empty, it is not finished + if not await self.is_empty(): + return False + metadata = await self.get_metadata() async with self.get_session(with_simple_commit=True) as session: @@ -629,7 +653,8 @@ async def is_empty(self) -> bool: has_pending_buffer_updates = buffer_result.scalar() await self._add_buffer_record(session) - # If there are no pending requests and no buffered updates, the queue is empty + + # If there are no pending requests and no buffered updates, the queue is finished return not has_pending_buffer_updates # There are pending requests (may be inaccurate), ensure recalculated metadata diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index eaa93785c9..e29bda4af9 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -316,9 +316,9 @@ async def reclaim_request( async def is_empty(self) -> bool: """Check if the request queue is empty. - An empty queue means that there are no requests currently in the queue, either pending or being processed. - However, this does not necessarily mean that the crawling operation is finished, as there still might be - tasks that could add additional requests to the queue. + An empty queue means that there are no requests currently available to fetch. However, this does not + necessarily mean that the crawling operation is finished, as there still might be requests being processed + or tasks that could add additional requests to the queue. Returns: True if the request queue is empty, False otherwise. @@ -328,19 +328,18 @@ async def is_empty(self) -> bool: async def is_finished(self) -> bool: """Check if the request queue is finished. - A finished queue means that all requests in the queue have been processed (the queue is empty) and there - are no more tasks that could add additional requests to the queue. This is the definitive way to check - if a crawling operation is complete. + A finished queue means that all requests have been processed and there are no more tasks that could add + additional requests to the queue. This is the definitive way to check if a crawling operation is complete. Returns: - True if the request queue is finished (empty and no pending add operations), False otherwise. + True if the request queue is finished and no pending add operations, False otherwise. """ if self._add_requests_tasks: logger.debug('Background add requests tasks are still in progress.') return False - if await self.is_empty(): - logger.debug('The request queue is empty.') + if await self._client.is_finished(): + logger.debug('The request queue is finished.') return True return False diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 6a39e83c12..e23df42886 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -142,6 +142,27 @@ async def handler(context: BasicCrawlingContext) -> None: ] +async def test_no_new_tasks_while_only_request_in_progress() -> None: + crawler = BasicCrawler( + concurrency_settings=ConcurrencySettings(desired_concurrency=4, max_concurrency=4), + ) + + request_manager = await crawler.get_request_manager() + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + await asyncio.sleep(0.2) + + with patch.object( + request_manager, + 'fetch_next_request', + wraps=request_manager.fetch_next_request, + ) as fetch_counter: + await crawler.run(['https://a.placeholder.com']) + + fetch_counter.assert_called_once() + + async def test_respects_no_retry() -> None: crawler = BasicCrawler(max_request_retries=2) calls = list[str]() @@ -1883,10 +1904,16 @@ class _CrawlerInput: def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]: - return [ - asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir)) - for crawler_input in crawler_inputs - ] + states = list[StatisticsState]() + for crawler_input in crawler_inputs: + states.append( + asyncio.run( + _run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir) + ) + ) + # Each crawler runs in its own event loop. Drop the cached storage instances between runs. + service_locator.storage_instance_manager.clear_cache() + return states async def test_crawler_state_persistence(tmp_path: Path) -> None: diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 2f12137625..ebcec63a1b 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -544,23 +544,33 @@ async def test_reclaim_request_with_forefront(rq: RequestQueue) -> None: assert next_request.url == 'https://example.com/first' -async def test_is_empty(rq: RequestQueue) -> None: - """Test checking if a request queue is empty.""" - # Initially the queue should be empty +async def test_is_empty_and_is_finished(rq: RequestQueue) -> None: + """Test checking if a request queue is empty and finished.""" + # Initially the queue should be empty and finished assert await rq.is_empty() is True + assert await rq.is_finished() is True # Add a request await rq.add_request('https://example.com') assert await rq.is_empty() is False + assert await rq.is_finished() is False - # Fetch and handle the request + # Fetch the request request = await rq.fetch_next_request() assert request is not None + + # Queue is empty, because there is no request for fetching + assert await rq.is_empty() is True + # Queue is not finished, because there is a request being processed + assert await rq.is_finished() is False + + # Mark the request as handled await rq.mark_request_as_handled(request) - # Queue should be empty again + # Queue should be empty and finished again assert await rq.is_empty() is True + assert await rq.is_finished() is True @pytest.mark.parametrize(