Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions docs/02_concepts/08_pagination.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import ApiLink from '@site/src/components/ApiLink';

import PaginationAsyncExample from '!!raw-loader!./code/08_pagination_async.py';
import PaginationSyncExample from '!!raw-loader!./code/08_pagination_sync.py';

import IterateItemsAsyncExample from '!!raw-loader!./code/08_iterate_items_async.py';
import IterateItemsSyncExample from '!!raw-loader!./code/08_iterate_items_sync.py';

Expand Down Expand Up @@ -45,7 +44,10 @@ The <ApiLink to="class/ListPage">`ListPage`</ApiLink> interface offers several k

## Generator-based iteration

For most use cases, `iterate_items()` is the recommended way to process all items in a dataset. It handles pagination automatically using a Python generator, fetching items in batches behind the scenes so you don't need to manage offsets or limits yourself.
For collection clients, the `iterate` method returns an iterator that lazily fetches as many pages as needed
to retrieve every item matching the filters. For dataset, key-value store and request queue clients, the
matching helpers are `iterate_items`, `iterate_keys` and `iterate_requests`. They handle pagination
automatically, so you don't need to manage offsets, limits or cursors yourself.

<Tabs>
<TabItem value="AsyncExample" label="Async client" default>
Expand All @@ -59,7 +61,3 @@ For most use cases, `iterate_items()` is the recommended way to process all item
</CodeBlock>
</TabItem>
</Tabs>

`iterate_items()` accepts the same filtering parameters as `list_items()` (`clean`, `fields`, `omit`, `unwind`, `skip_empty`, `skip_hidden`), so you can combine automatic pagination with data filtering.

Similarly, `KeyValueStoreClient` provides an `iterate_keys()` method for iterating over all keys in a key-value store without manual pagination.
11 changes: 8 additions & 3 deletions docs/02_concepts/code/08_iterate_items_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ async def main() -> None:
apify_client = ApifyClientAsync(TOKEN)
dataset_client = apify_client.dataset('dataset-id')

# Iterate through all items automatically.
async for item in dataset_client.iterate_items():
print(item)
# Define the pagination parameters
limit = 1500 # Number of items in total
offset = 100 # Starting offset

# Iterate through items automatically, lazily sending as many API calls
# as needed and receiving items in chunks.
async for item in dataset_client.iterate_items(limit=limit, offset=offset):
print(item) # Process the item as needed
11 changes: 8 additions & 3 deletions docs/02_concepts/code/08_iterate_items_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ def main() -> None:
apify_client = ApifyClient(TOKEN)
dataset_client = apify_client.dataset('dataset-id')

# Iterate through all items automatically.
for item in dataset_client.iterate_items():
print(item)
# Define the pagination parameters
limit = 1500 # Number of items in total
offset = 100 # Starting offset

# Iterate through items automatically, lazily sending as many API calls
# as needed and receiving items in chunks.
for item in dataset_client.iterate_items(limit=limit, offset=offset):
print(item) # Process the item as needed


if __name__ == '__main__':
Expand Down
27 changes: 8 additions & 19 deletions docs/02_concepts/code/08_pagination_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@ async def main() -> None:
dataset_client = apify_client.dataset('dataset-id')

# Define the pagination parameters
limit = 1000 # Number of items per page
limit = 1000 # Number items to request from API
offset = 0 # Starting offset
all_items = [] # List to store all fetched items

while True:
# Fetch a page of items
response = await dataset_client.list_items(limit=limit, offset=offset)
items = response.items
total = response.total
# Send single API call to fetch paginated items.
# (number of items per single call can be limited by API)
paginated_items = await dataset_client.list_items(limit=limit, offset=offset)

print(f'Fetched {len(items)} items')
# Inspect pagination metadata returned by API
print(paginated_items.total)

# Add the fetched items to the complete list
all_items.extend(items)

# Exit the loop if there are no more items to fetch
if offset + limit >= total:
break

# Increment the offset for the next page
offset += limit

print(f'Overall fetched {len(all_items)} items')
for item in paginated_items.items:
print(item) # Process the item as needed
27 changes: 8 additions & 19 deletions docs/02_concepts/code/08_pagination_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@ def main() -> None:
dataset_client = apify_client.dataset('dataset-id')

# Define the pagination parameters
limit = 1000 # Number of items per page
limit = 1000 # Number items to request from API
offset = 0 # Starting offset
all_items = [] # List to store all fetched items

while True:
# Fetch a page of items
response = dataset_client.list_items(limit=limit, offset=offset)
items = response.items
total = response.total
# Send single API call to fetch paginated items.
# (number of items per single call can be limited by API)
paginated_items = dataset_client.list_items(limit=limit, offset=offset)

print(f'Fetched {len(items)} items')
# Inspect pagination metadata returned by API
print(paginated_items.total)

# Add the fetched items to the complete list
all_items.extend(items)

# Exit the loop if there are no more items to fetch
if offset + limit >= total:
break

# Increment the offset for the next page
offset += limit

print(f'Overall fetched {len(all_items)} items')
for item in paginated_items.items:
print(item) # Process the item as needed
1 change: 1 addition & 0 deletions scripts/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
(re.compile(r'\bSynchronous\b'), 'Asynchronous'),
(re.compile(r'Retry a function'), 'Retry an async function'),
(re.compile(r'Function to retry'), 'Async function to retry'),
(re.compile(r'returned page also supports iteration: `for'), 'returned page also supports iteration: `async for'),
]
"""Patterns for converting sync docstrings to async docstrings."""

Expand Down
223 changes: 223 additions & 0 deletions src/apify_client/_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Protocol, TypeVar, overload

from apify_client._models import KeyValueStoreKey, ListOfKeys, ListOfRequests, Request

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator

T = TypeVar('T')


class HasItems(Protocol[T]):
items: list[T]


def _min_for_limit_param(a: int | None, b: int | None) -> int | None:
"""Return minimum of two limit parameters, treating `None` or `0` as infinity.

The Apify API treats `0` as no limit for the `limit` parameter, so `0` here means infinity.
Returns `None` when both inputs represent infinity.
"""
if a == 0:
a = None
if b == 0:
b = None
if a is None:
return b
if b is None:
return a
return min(a, b)


def get_items_iterator(
callback: Callable[..., HasItems[T]],
*,
limit: int | None = None,
offset: int | None = None,
chunk_size: int | None = None,
) -> Iterator[T]:
"""Yield individual items from offset-based paginated API responses.

The `callback` is invoked lazily to fetch each page from the API. It must accept `limit` and
`offset` keyword arguments and return an object whose `items` attribute is a list. If the
object also exposes a `count` attribute, it is used for offset bookkeeping (the Apify API's
`count` reflects items scanned, which can exceed items returned when filters are applied).

Iteration stops when a page returns no items or when the user-requested `limit` is reached.
The `total` field is intentionally not consulted, because it can change between calls.

Args:
callback: Function returning a single page of items.
limit: Maximum total number of items to yield across all pages. `None` or `0` means no limit.
offset: Starting offset for the first page.
chunk_size: Maximum number of items requested per API call. `None` or `0` lets the API decide.
"""
effective_chunk = chunk_size or 0
initial_offset = offset or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = callback(
limit=effective_chunk
if not initial_limit
else _min_for_limit_param(initial_limit - fetched_items, effective_chunk),
offset=initial_offset + fetched_items,
)
yield from current_page.items

fetched_items += getattr(current_page, 'count', len(current_page.items))

if not current_page.items or (initial_limit and fetched_items >= initial_limit):
break


async def get_items_iterator_async(
callback: Callable[..., Awaitable[HasItems[T]]],
*,
limit: int | None = None,
offset: int | None = None,
chunk_size: int | None = None,
) -> AsyncIterator[T]:
"""Async variant of :func:`get_items_iterator`.

The `callback` must be an awaitable returning a single page of items.
"""
effective_chunk = chunk_size or 0
initial_offset = offset or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = await callback(
limit=effective_chunk
if not initial_limit
else _min_for_limit_param(initial_limit - fetched_items, effective_chunk),
offset=initial_offset + fetched_items,
)
for item in current_page.items:
yield item

fetched_items += getattr(current_page, 'count', len(current_page.items))

if not current_page.items or (initial_limit and fetched_items >= initial_limit):
break


@overload
def get_cursor_iterator(
callback: Callable[..., ListOfKeys],
*,
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> Iterator[KeyValueStoreKey]: ...
@overload
def get_cursor_iterator(
callback: Callable[..., ListOfRequests],
*,
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> Iterator[Request]: ...


def get_cursor_iterator(
callback: Callable[..., ListOfKeys] | Callable[..., ListOfRequests],
*,
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> Iterator[Request] | Iterator[KeyValueStoreKey]:
"""Yield individual items from cursor-paginated API responses.

Each page is expected to expose `items` and `next_<cursor_param>`; iteration ends when a
page returns no items, the next cursor is `None`, or the user-requested `limit` is reached.

Args:
callback: Function returning a single page of items. Receives the cursor as a kwarg
named after `cursor_param` and a `limit` kwarg.
cursor_param: Name of the cursor query-parameter (e.g. `cursor` or `exclusive_start_key`).
cursor: Value of the cursor for the first request, or `None` to start from the beginning.
limit: Maximum total number of items to yield across all pages.
chunk_size: Maximum number of items requested per API call.
"""
effective_chunk = chunk_size or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = callback(
limit=effective_chunk
if not initial_limit
else _min_for_limit_param(initial_limit - fetched_items, effective_chunk),
cursor=cursor,
)
yield from current_page.items

fetched_items += getattr(current_page, 'count', len(current_page.items))

if isinstance(current_page, ListOfKeys):
cursor = current_page.next_exclusive_start_key
elif isinstance(current_page, ListOfRequests):
cursor = current_page.next_cursor
else:
raise TypeError('Unsupported page type returned by callback; expected ListOfKeys or ListOfRequests.')

if not current_page.items or cursor is None or (initial_limit and fetched_items >= initial_limit):
break


@overload
def get_cursor_iterator_async(
callback: Callable[..., Awaitable[ListOfKeys]],
*,
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> AsyncIterator[KeyValueStoreKey]: ...
@overload
def get_cursor_iterator_async(
callback: Callable[..., Awaitable[ListOfRequests]],
*,
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> AsyncIterator[Request]: ...


async def get_cursor_iterator_async(
callback: Callable[..., Awaitable[ListOfKeys]] | Callable[..., Awaitable[ListOfRequests]],
*,
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> AsyncIterator[KeyValueStoreKey] | AsyncIterator[Request]:
"""Async variant of :func:`get_cursor_iterator`."""
effective_chunk = chunk_size or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = await callback(
limit=effective_chunk
if not initial_limit
else _min_for_limit_param(initial_limit - fetched_items, effective_chunk),
cursor=cursor,
)
for item in current_page.items:
yield item

fetched_items += getattr(current_page, 'count', len(current_page.items))

if isinstance(current_page, ListOfKeys):
cursor = current_page.next_exclusive_start_key
elif isinstance(current_page, ListOfRequests):
cursor = current_page.next_cursor
else:
raise TypeError('Unsupported page type returned by callback; expected ListOfKeys or ListOfRequests.')

if not current_page.items or cursor is None or (initial_limit and fetched_items >= initial_limit):
break
Loading
Loading