Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/crawlee/storage_clients/_base/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,16 @@ async def reclaim_request(

@abstractmethod
async def is_empty(self) -> bool:
"""Check if the request queue is empty.
"""Check if the request queue is empty. That means there are no requests available to fetch.
Returns:
True if the request queue is empty, False otherwise.
"""

async def is_finished(self) -> bool:
"""Check if the request queue is finished. That means the queue is empty and no requests are being processed.
Returns:
True if the request queue is finished, False otherwise.
"""
return await self.is_empty()
57 changes: 13 additions & 44 deletions src/crawlee/storage_clients/_file_system/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ def __init__(
self._request_cache_needs_refresh = True
"""Flag indicating whether the cache needs to be refreshed from filesystem."""

self._is_empty_cache: bool | None = None
"""Cache for is_empty result: None means unknown, True/False is cached state."""

self._state = recoverable_state
"""Recoverable state to maintain request ordering, in-progress status, and handled status."""

Expand Down Expand Up @@ -291,9 +288,6 @@ async def drop(self) -> None:
self._request_cache.clear()
self._request_cache_needs_refresh = True

# Invalidate is_empty cache.
self._is_empty_cache = None

@override
async def purge(self) -> None:
async with self._lock:
Expand All @@ -315,9 +309,6 @@ async def purge(self) -> None:
new_total_request_count=0,
)

# Invalidate is_empty cache.
self._is_empty_cache = None

@override
async def add_batch_of_requests(
self,
Expand All @@ -326,7 +317,6 @@ async def add_batch_of_requests(
forefront: bool = False,
) -> AddRequestsResponse:
async with self._lock:
self._is_empty_cache = None
new_total_request_count = self._metadata.total_request_count
new_pending_request_count = self._metadata.pending_request_count
processed_requests = list[ProcessedRequest]()
Expand Down Expand Up @@ -435,9 +425,6 @@ async def add_batch_of_requests(
if forefront:
self._request_cache_needs_refresh = True

# Invalidate is_empty cache.
self._is_empty_cache = None

return AddRequestsResponse(
processed_requests=processed_requests,
unprocessed_requests=unprocessed_requests,
Expand Down Expand Up @@ -482,7 +469,6 @@ async def fetch_next_request(self) -> Request | None:
@override
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
async with self._lock:
self._is_empty_cache = None
state = self._state.current_value

# Check if the request is in progress.
Expand Down Expand Up @@ -530,7 +516,6 @@ async def reclaim_request(
forefront: bool = False,
) -> ProcessedRequest | None:
async with self._lock:
self._is_empty_cache = None
state = self._state.current_value

# Check if the request is in progress.
Expand Down Expand Up @@ -586,39 +571,23 @@ async def reclaim_request(
@override
async def is_empty(self) -> bool:
async with self._lock:
# If we have a cached value, return it immediately.
if self._is_empty_cache is not None:
return self._is_empty_cache

state = self._state.current_value

# If there are in-progress requests, return False immediately.
if len(state.in_progress_requests) > 0:
self._is_empty_cache = False
return False

# If we have a cached requests, check them first (fast path).
if self._request_cache:
for req in self._request_cache:
if req.unique_key not in state.handled_requests:
self._is_empty_cache = False
return False
self._is_empty_cache = True
return len(state.in_progress_requests) == 0

Comment on lines -589 to -608

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's safe to remove all this?

# Fallback: check state for unhandled requests.
await self._update_metadata(update_accessed_at=True)

# Check if there are any requests that are not handled
all_requests = set(state.forefront_requests.keys()) | set(state.regular_requests.keys())
unhandled_requests = all_requests - state.handled_requests
# The queue is empty when nothing is available to fetch, i.e. every unhandled request is
# currently in progress.
return self._metadata.pending_request_count - len(self._state.current_value.in_progress_requests) <= 0

@override
async def is_finished(self) -> bool:
# If anything is still available to fetch, the queue is not finished.
if not await self.is_empty():
return False

if unhandled_requests:
self._is_empty_cache = False
return False
async with self._lock:
await self._update_metadata(update_accessed_at=True)

self._is_empty_cache = True
return True
# The queue is finished when there are no pending requests and no in-progress requests.
return self._metadata.pending_request_count == 0 and not self._state.current_value.in_progress_requests

def _get_request_path(self, unique_key: str) -> Path:
"""Get the path to a specific request file.
Expand Down
14 changes: 12 additions & 2 deletions src/crawlee/storage_clients/_memory/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,18 @@ async def is_empty(self) -> bool:
"""
await self._update_metadata(update_accessed_at=True)

# Queue is empty if there are no pending requests and no requests in progress.
return len(self._pending_requests) == 0 and len(self._in_progress_requests) == 0
# Queue is empty if there are no pending requests.
return len(self._pending_requests) == 0

@override
async def is_finished(self) -> bool:
"""Check if the queue is finished.

Returns:
True if the queue is finished, False otherwise.
"""
# Queue is finished if it is empty and there are no in-progress requests.
return await self.is_empty() and len(self._in_progress_requests) == 0

async def _update_metadata(
self,
Expand Down
17 changes: 11 additions & 6 deletions src/crawlee/storage_clients/_redis/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,7 @@ async def reclaim_request(
@retry_on_error(RedisError)
@override
async def is_empty(self) -> bool:
"""Check if the queue is empty.

Returns:
True if the queue is empty, False otherwise.
"""
# Requests buffered for fetching mean the queue is not empty.
if self._pending_fetch_cache:
return False

Expand All @@ -509,9 +505,18 @@ async def is_empty(self) -> bool:
await self._reclaim_stale_requests()
self._next_reclaim_stale = datetime.now(tz=timezone.utc) + self._RECLAIM_INTERVAL

# Check if there are any requests in the queue.
requests_in_queue = await await_redis_response(self._redis.llen(self._queue_key))
return requests_in_queue == 0

@retry_on_error(RedisError)
@override
async def is_finished(self) -> bool:
is_empty = await self.is_empty()

metadata = await self.get_metadata()

return metadata.pending_request_count == 0
return is_empty and metadata.pending_request_count == 0

async def _load_scripts(self) -> None:
"""Ensure Lua scripts are loaded in Redis."""
Expand Down
29 changes: 27 additions & 2 deletions src/crawlee/storage_clients/_sql/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,34 @@ async def reclaim_request(
@retry_on_error(SQLAlchemyError)
@override
async def is_empty(self) -> bool:
# Check in-memory cache for requests
# Requests buffered for fetching mean the queue is not empty.
if self._pending_fetch_cache:
return False

now = datetime.now(timezone.utc)

# Check if there are any unhandled requests that are not blocked.
async with self.get_session(with_simple_commit=True) as session:
stmt = select(
exists().where(
self._ITEM_TABLE.request_queue_id == self._id,
self._ITEM_TABLE.is_handled == False, # noqa: E712
or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now),
)
)
result = await session.execute(stmt)

await self._add_buffer_record(session)

return not result.scalar()

@retry_on_error(SQLAlchemyError)
@override
async def is_finished(self) -> bool:
# If the queue is not empty, it is not finished
if not await self.is_empty():
return False

metadata = await self.get_metadata()

async with self.get_session(with_simple_commit=True) as session:
Expand Down Expand Up @@ -629,7 +653,8 @@ async def is_empty(self) -> bool:
has_pending_buffer_updates = buffer_result.scalar()

await self._add_buffer_record(session)
# If there are no pending requests and no buffered updates, the queue is empty

# If there are no pending requests and no buffered updates, the queue is finished
return not has_pending_buffer_updates

# There are pending requests (may be inaccurate), ensure recalculated metadata
Expand Down
17 changes: 8 additions & 9 deletions src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ async def reclaim_request(
async def is_empty(self) -> bool:
"""Check if the request queue is empty.

An empty queue means that there are no requests currently in the queue, either pending or being processed.
However, this does not necessarily mean that the crawling operation is finished, as there still might be
tasks that could add additional requests to the queue.
An empty queue means that there are no requests currently available to fetch. However, this does not
necessarily mean that the crawling operation is finished, as there still might be requests being processed
or tasks that could add additional requests to the queue.

Returns:
True if the request queue is empty, False otherwise.
Expand All @@ -328,19 +328,18 @@ async def is_empty(self) -> bool:
async def is_finished(self) -> bool:
"""Check if the request queue is finished.

A finished queue means that all requests in the queue have been processed (the queue is empty) and there
are no more tasks that could add additional requests to the queue. This is the definitive way to check
if a crawling operation is complete.
A finished queue means that all requests have been processed and there are no more tasks that could add
additional requests to the queue. This is the definitive way to check if a crawling operation is complete.

Returns:
True if the request queue is finished (empty and no pending add operations), False otherwise.
True if the request queue is finished and no pending add operations, False otherwise.
"""
if self._add_requests_tasks:
logger.debug('Background add requests tasks are still in progress.')
return False

if await self.is_empty():
logger.debug('The request queue is empty.')
if await self._client.is_finished():
logger.debug('The request queue is finished.')
return True

return False
Expand Down
35 changes: 31 additions & 4 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ async def handler(context: BasicCrawlingContext) -> None:
]


async def test_no_new_tasks_while_only_request_in_progress() -> None:
crawler = BasicCrawler(
concurrency_settings=ConcurrencySettings(desired_concurrency=4, max_concurrency=4),
)

request_manager = await crawler.get_request_manager()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
await asyncio.sleep(0.2)

with patch.object(
request_manager,
'fetch_next_request',
wraps=request_manager.fetch_next_request,
) as fetch_counter:
await crawler.run(['https://a.placeholder.com'])

fetch_counter.assert_called_once()


async def test_respects_no_retry() -> None:
crawler = BasicCrawler(max_request_retries=2)
calls = list[str]()
Expand Down Expand Up @@ -1883,10 +1904,16 @@ class _CrawlerInput:


def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]:
return [
asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir))
for crawler_input in crawler_inputs
]
states = list[StatisticsState]()
for crawler_input in crawler_inputs:
states.append(
asyncio.run(
_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir)
)
)
# Each crawler runs in its own event loop. Drop the cached storage instances between runs.
service_locator.storage_instance_manager.clear_cache()
return states


async def test_crawler_state_persistence(tmp_path: Path) -> None:
Expand Down
20 changes: 15 additions & 5 deletions tests/unit/storages/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,23 +544,33 @@ async def test_reclaim_request_with_forefront(rq: RequestQueue) -> None:
assert next_request.url == 'https://example.com/first'


async def test_is_empty(rq: RequestQueue) -> None:
"""Test checking if a request queue is empty."""
# Initially the queue should be empty
async def test_is_empty_and_is_finished(rq: RequestQueue) -> None:
"""Test checking if a request queue is empty and finished."""
# Initially the queue should be empty and finished
assert await rq.is_empty() is True
assert await rq.is_finished() is True

# Add a request
await rq.add_request('https://example.com')
assert await rq.is_empty() is False
assert await rq.is_finished() is False

# Fetch and handle the request
# Fetch the request
request = await rq.fetch_next_request()

assert request is not None

# Queue is empty, because there is no request for fetching
assert await rq.is_empty() is True
# Queue is not finished, because there is a request being processed
assert await rq.is_finished() is False

# Mark the request as handled
await rq.mark_request_as_handled(request)

# Queue should be empty again
# Queue should be empty and finished again
assert await rq.is_empty() is True
assert await rq.is_finished() is True


@pytest.mark.parametrize(
Expand Down
Loading