diff --git a/docs/02_concepts/08_pagination.mdx b/docs/02_concepts/08_pagination.mdx index 9f1da230..167fad4f 100644 --- a/docs/02_concepts/08_pagination.mdx +++ b/docs/02_concepts/08_pagination.mdx @@ -12,7 +12,6 @@ import ApiLink from '@site/src/components/ApiLink'; import PaginationAsyncExample from '!!raw-loader!./code/08_pagination_async.py'; import PaginationSyncExample from '!!raw-loader!./code/08_pagination_sync.py'; - import IterateItemsAsyncExample from '!!raw-loader!./code/08_iterate_items_async.py'; import IterateItemsSyncExample from '!!raw-loader!./code/08_iterate_items_sync.py'; @@ -45,7 +44,10 @@ The `ListPage` interface offers several k ## Generator-based iteration -For most use cases, `iterate_items()` is the recommended way to process all items in a dataset. It handles pagination automatically using a Python generator, fetching items in batches behind the scenes so you don't need to manage offsets or limits yourself. +For collection clients, the `iterate` method returns an iterator that lazily fetches as many pages as needed +to retrieve every item matching the filters. For dataset, key-value store and request queue clients, the +matching helpers are `iterate_items`, `iterate_keys` and `iterate_requests`. They handle pagination +automatically, so you don't need to manage offsets, limits or cursors yourself. @@ -59,7 +61,3 @@ For most use cases, `iterate_items()` is the recommended way to process all item - -`iterate_items()` accepts the same filtering parameters as `list_items()` (`clean`, `fields`, `omit`, `unwind`, `skip_empty`, `skip_hidden`), so you can combine automatic pagination with data filtering. - -Similarly, `KeyValueStoreClient` provides an `iterate_keys()` method for iterating over all keys in a key-value store without manual pagination. diff --git a/docs/02_concepts/code/08_iterate_items_async.py b/docs/02_concepts/code/08_iterate_items_async.py index fba9b5b0..e727680e 100644 --- a/docs/02_concepts/code/08_iterate_items_async.py +++ b/docs/02_concepts/code/08_iterate_items_async.py @@ -7,6 +7,11 @@ async def main() -> None: apify_client = ApifyClientAsync(TOKEN) dataset_client = apify_client.dataset('dataset-id') - # Iterate through all items automatically. - async for item in dataset_client.iterate_items(): - print(item) + # Define the pagination parameters + limit = 1500 # Number of items in total + offset = 100 # Starting offset + + # Iterate through items automatically, lazily sending as many API calls + # as needed and receiving items in chunks. + async for item in dataset_client.iterate_items(limit=limit, offset=offset): + print(item) # Process the item as needed diff --git a/docs/02_concepts/code/08_iterate_items_sync.py b/docs/02_concepts/code/08_iterate_items_sync.py index 005c899f..2b6ed487 100644 --- a/docs/02_concepts/code/08_iterate_items_sync.py +++ b/docs/02_concepts/code/08_iterate_items_sync.py @@ -7,9 +7,14 @@ def main() -> None: apify_client = ApifyClient(TOKEN) dataset_client = apify_client.dataset('dataset-id') - # Iterate through all items automatically. - for item in dataset_client.iterate_items(): - print(item) + # Define the pagination parameters + limit = 1500 # Number of items in total + offset = 100 # Starting offset + + # Iterate through items automatically, lazily sending as many API calls + # as needed and receiving items in chunks. + for item in dataset_client.iterate_items(limit=limit, offset=offset): + print(item) # Process the item as needed if __name__ == '__main__': diff --git a/docs/02_concepts/code/08_pagination_async.py b/docs/02_concepts/code/08_pagination_async.py index 50e9d047..23ac5fde 100644 --- a/docs/02_concepts/code/08_pagination_async.py +++ b/docs/02_concepts/code/08_pagination_async.py @@ -10,26 +10,15 @@ async def main() -> None: dataset_client = apify_client.dataset('dataset-id') # Define the pagination parameters - limit = 1000 # Number of items per page + limit = 1000 # Number items to request from API offset = 0 # Starting offset - all_items = [] # List to store all fetched items - while True: - # Fetch a page of items - response = await dataset_client.list_items(limit=limit, offset=offset) - items = response.items - total = response.total + # Send single API call to fetch paginated items. + # (number of items per single call can be limited by API) + paginated_items = await dataset_client.list_items(limit=limit, offset=offset) - print(f'Fetched {len(items)} items') + # Inspect pagination metadata returned by API + print(paginated_items.total) - # Add the fetched items to the complete list - all_items.extend(items) - - # Exit the loop if there are no more items to fetch - if offset + limit >= total: - break - - # Increment the offset for the next page - offset += limit - - print(f'Overall fetched {len(all_items)} items') + for item in paginated_items.items: + print(item) # Process the item as needed diff --git a/docs/02_concepts/code/08_pagination_sync.py b/docs/02_concepts/code/08_pagination_sync.py index 3beb4fbe..f144339e 100644 --- a/docs/02_concepts/code/08_pagination_sync.py +++ b/docs/02_concepts/code/08_pagination_sync.py @@ -10,26 +10,15 @@ def main() -> None: dataset_client = apify_client.dataset('dataset-id') # Define the pagination parameters - limit = 1000 # Number of items per page + limit = 1000 # Number items to request from API offset = 0 # Starting offset - all_items = [] # List to store all fetched items - while True: - # Fetch a page of items - response = dataset_client.list_items(limit=limit, offset=offset) - items = response.items - total = response.total + # Send single API call to fetch paginated items. + # (number of items per single call can be limited by API) + paginated_items = dataset_client.list_items(limit=limit, offset=offset) - print(f'Fetched {len(items)} items') + # Inspect pagination metadata returned by API + print(paginated_items.total) - # Add the fetched items to the complete list - all_items.extend(items) - - # Exit the loop if there are no more items to fetch - if offset + limit >= total: - break - - # Increment the offset for the next page - offset += limit - - print(f'Overall fetched {len(all_items)} items') + for item in paginated_items.items: + print(item) # Process the item as needed diff --git a/scripts/_utils.py b/scripts/_utils.py index 48612c54..b7d91853 100644 --- a/scripts/_utils.py +++ b/scripts/_utils.py @@ -27,6 +27,7 @@ (re.compile(r'\bSynchronous\b'), 'Asynchronous'), (re.compile(r'Retry a function'), 'Retry an async function'), (re.compile(r'Function to retry'), 'Async function to retry'), + (re.compile(r'returned page also supports iteration: `for'), 'returned page also supports iteration: `async for'), ] """Patterns for converting sync docstrings to async docstrings.""" diff --git a/src/apify_client/_pagination.py b/src/apify_client/_pagination.py new file mode 100644 index 00000000..feee601e --- /dev/null +++ b/src/apify_client/_pagination.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, TypeVar, overload + +from apify_client._models import KeyValueStoreKey, ListOfKeys, ListOfRequests, Request + +if TYPE_CHECKING: + from collections.abc import AsyncIterator, Awaitable, Callable, Iterator + +T = TypeVar('T') + + +class HasItems(Protocol[T]): + items: list[T] + + +def _min_for_limit_param(a: int | None, b: int | None) -> int | None: + """Return minimum of two limit parameters, treating `None` or `0` as infinity. + + The Apify API treats `0` as no limit for the `limit` parameter, so `0` here means infinity. + Returns `None` when both inputs represent infinity. + """ + if a == 0: + a = None + if b == 0: + b = None + if a is None: + return b + if b is None: + return a + return min(a, b) + + +def get_items_iterator( + callback: Callable[..., HasItems[T]], + *, + limit: int | None = None, + offset: int | None = None, + chunk_size: int | None = None, +) -> Iterator[T]: + """Yield individual items from offset-based paginated API responses. + + The `callback` is invoked lazily to fetch each page from the API. It must accept `limit` and + `offset` keyword arguments and return an object whose `items` attribute is a list. If the + object also exposes a `count` attribute, it is used for offset bookkeeping (the Apify API's + `count` reflects items scanned, which can exceed items returned when filters are applied). + + Iteration stops when a page returns no items or when the user-requested `limit` is reached. + The `total` field is intentionally not consulted, because it can change between calls. + + Args: + callback: Function returning a single page of items. + limit: Maximum total number of items to yield across all pages. `None` or `0` means no limit. + offset: Starting offset for the first page. + chunk_size: Maximum number of items requested per API call. `None` or `0` lets the API decide. + """ + effective_chunk = chunk_size or 0 + initial_offset = offset or 0 + initial_limit = limit or 0 + fetched_items = 0 + + while True: + current_page = callback( + limit=effective_chunk + if not initial_limit + else _min_for_limit_param(initial_limit - fetched_items, effective_chunk), + offset=initial_offset + fetched_items, + ) + yield from current_page.items + + fetched_items += getattr(current_page, 'count', len(current_page.items)) + + if not current_page.items or (initial_limit and fetched_items >= initial_limit): + break + + +async def get_items_iterator_async( + callback: Callable[..., Awaitable[HasItems[T]]], + *, + limit: int | None = None, + offset: int | None = None, + chunk_size: int | None = None, +) -> AsyncIterator[T]: + """Async variant of :func:`get_items_iterator`. + + The `callback` must be an awaitable returning a single page of items. + """ + effective_chunk = chunk_size or 0 + initial_offset = offset or 0 + initial_limit = limit or 0 + fetched_items = 0 + + while True: + current_page = await callback( + limit=effective_chunk + if not initial_limit + else _min_for_limit_param(initial_limit - fetched_items, effective_chunk), + offset=initial_offset + fetched_items, + ) + for item in current_page.items: + yield item + + fetched_items += getattr(current_page, 'count', len(current_page.items)) + + if not current_page.items or (initial_limit and fetched_items >= initial_limit): + break + + +@overload +def get_cursor_iterator( + callback: Callable[..., ListOfKeys], + *, + cursor: str | None = None, + limit: int | None = None, + chunk_size: int | None = None, +) -> Iterator[KeyValueStoreKey]: ... +@overload +def get_cursor_iterator( + callback: Callable[..., ListOfRequests], + *, + cursor: str | None = None, + limit: int | None = None, + chunk_size: int | None = None, +) -> Iterator[Request]: ... + + +def get_cursor_iterator( + callback: Callable[..., ListOfKeys] | Callable[..., ListOfRequests], + *, + cursor: str | None = None, + limit: int | None = None, + chunk_size: int | None = None, +) -> Iterator[Request] | Iterator[KeyValueStoreKey]: + """Yield individual items from cursor-paginated API responses. + + Each page is expected to expose `items` and `next_`; iteration ends when a + page returns no items, the next cursor is `None`, or the user-requested `limit` is reached. + + Args: + callback: Function returning a single page of items. Receives the cursor as a kwarg + named after `cursor_param` and a `limit` kwarg. + cursor_param: Name of the cursor query-parameter (e.g. `cursor` or `exclusive_start_key`). + cursor: Value of the cursor for the first request, or `None` to start from the beginning. + limit: Maximum total number of items to yield across all pages. + chunk_size: Maximum number of items requested per API call. + """ + effective_chunk = chunk_size or 0 + initial_limit = limit or 0 + fetched_items = 0 + + while True: + current_page = callback( + limit=effective_chunk + if not initial_limit + else _min_for_limit_param(initial_limit - fetched_items, effective_chunk), + cursor=cursor, + ) + yield from current_page.items + + fetched_items += getattr(current_page, 'count', len(current_page.items)) + + if isinstance(current_page, ListOfKeys): + cursor = current_page.next_exclusive_start_key + elif isinstance(current_page, ListOfRequests): + cursor = current_page.next_cursor + else: + raise TypeError('Unsupported page type returned by callback; expected ListOfKeys or ListOfRequests.') + + if not current_page.items or cursor is None or (initial_limit and fetched_items >= initial_limit): + break + + +@overload +def get_cursor_iterator_async( + callback: Callable[..., Awaitable[ListOfKeys]], + *, + cursor: str | None = None, + limit: int | None = None, + chunk_size: int | None = None, +) -> AsyncIterator[KeyValueStoreKey]: ... +@overload +def get_cursor_iterator_async( + callback: Callable[..., Awaitable[ListOfRequests]], + *, + cursor: str | None = None, + limit: int | None = None, + chunk_size: int | None = None, +) -> AsyncIterator[Request]: ... + + +async def get_cursor_iterator_async( + callback: Callable[..., Awaitable[ListOfKeys]] | Callable[..., Awaitable[ListOfRequests]], + *, + cursor: str | None = None, + limit: int | None = None, + chunk_size: int | None = None, +) -> AsyncIterator[KeyValueStoreKey] | AsyncIterator[Request]: + """Async variant of :func:`get_cursor_iterator`.""" + effective_chunk = chunk_size or 0 + initial_limit = limit or 0 + fetched_items = 0 + + while True: + current_page = await callback( + limit=effective_chunk + if not initial_limit + else _min_for_limit_param(initial_limit - fetched_items, effective_chunk), + cursor=cursor, + ) + for item in current_page.items: + yield item + + fetched_items += getattr(current_page, 'count', len(current_page.items)) + + if isinstance(current_page, ListOfKeys): + cursor = current_page.next_exclusive_start_key + elif isinstance(current_page, ListOfRequests): + cursor = current_page.next_cursor + else: + raise TypeError('Unsupported page type returned by callback; expected ListOfKeys or ListOfRequests.') + + if not current_page.items or cursor is None or (initial_limit and fetched_items >= initial_limit): + break diff --git a/src/apify_client/_resource_clients/actor_collection.py b/src/apify_client/_resource_clients/actor_collection.py index f9e6ac16..9fbdb928 100644 --- a/src/apify_client/_resource_clients/actor_collection.py +++ b/src/apify_client/_resource_clients/actor_collection.py @@ -13,12 +13,15 @@ ListOfActors, ListOfActorsResponse, ) +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync from apify_client._utils import to_seconds if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator from datetime import timedelta + from apify_client._models import ActorShort from apify_client._types import Timeout @@ -69,6 +72,40 @@ def list( result = self._list(timeout=timeout, my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) return ListOfActorsResponse.model_validate(result).data + def iterate( + self, + *, + my: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt', + timeout: Timeout = 'medium', + ) -> Iterator[ActorShort]: + """Iterate over the Actors the user has created or used. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors + + Args: + my: If True, will return only Actors which the user has created themselves. + limit: How many Actors to list. + offset: What Actor to include as first when retrieving the list. + desc: Whether to sort the Actors in descending order based on their creation date. + sort_by: Field to sort the results by. + timeout: Timeout for the API HTTP request. + + Yields: + The Actors available to the user matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfActors: + return self.list(my=my, limit=limit, offset=offset, desc=desc, sort_by=sort_by, timeout=timeout) + + return get_items_iterator(_callback, limit=limit, offset=offset) + def create( self, *, @@ -214,6 +251,40 @@ async def list( result = await self._list(timeout=timeout, my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) return ListOfActorsResponse.model_validate(result).data + def iterate( + self, + *, + my: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt', + timeout: Timeout = 'medium', + ) -> AsyncIterator[ActorShort]: + """Iterate over the Actors the user has created or used. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors + + Args: + my: If True, will return only Actors which the user has created themselves. + limit: How many Actors to list. + offset: What Actor to include as first when retrieving the list. + desc: Whether to sort the Actors in descending order based on their creation date. + sort_by: Field to sort the results by. + timeout: Timeout for the API HTTP request. + + Yields: + The Actors available to the user matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfActors: + return await self.list(my=my, limit=limit, offset=offset, desc=desc, sort_by=sort_by, timeout=timeout) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) + async def create( self, *, diff --git a/src/apify_client/_resource_clients/actor_env_var_collection.py b/src/apify_client/_resource_clients/actor_env_var_collection.py index 788745b4..f43323d3 100644 --- a/src/apify_client/_resource_clients/actor_env_var_collection.py +++ b/src/apify_client/_resource_clients/actor_env_var_collection.py @@ -7,6 +7,8 @@ from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + from apify_client._types import Timeout @@ -43,6 +45,13 @@ def list(self, *, timeout: Timeout = 'short') -> ListOfEnvVars: result = self._list(timeout=timeout) return ListOfEnvVarsResponse.model_validate(result).data + def iterate(self, *, timeout: Timeout = 'short') -> Iterator[EnvVar]: + """Iterate over the available Actor environment variables. + + There is no possibility to control the pagination on this endpoint. + """ + return iter(self.list(timeout=timeout).items) + def create( self, *, @@ -104,6 +113,14 @@ async def list(self, *, timeout: Timeout = 'short') -> ListOfEnvVars: result = await self._list(timeout=timeout) return ListOfEnvVarsResponse.model_validate(result).data + async def iterate(self, *, timeout: Timeout = 'short') -> AsyncIterator[EnvVar]: + """Iterate over the available Actor environment variables. + + There is no possibility to control the pagination on this endpoint. + """ + for item in (await self.list(timeout=timeout)).items: + yield item + async def create( self, *, diff --git a/src/apify_client/_resource_clients/actor_version_collection.py b/src/apify_client/_resource_clients/actor_version_collection.py index b1aa21a6..46070ebd 100644 --- a/src/apify_client/_resource_clients/actor_version_collection.py +++ b/src/apify_client/_resource_clients/actor_version_collection.py @@ -18,6 +18,8 @@ from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + from apify_client._literals import VersionSourceType from apify_client._types import Timeout @@ -47,6 +49,8 @@ def __init__( def list(self, *, timeout: Timeout = 'short') -> ListOfVersions: """List the available Actor versions. + The returned page also supports iteration: `for item in client.list()` yields individual versions. + https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions Args: @@ -58,6 +62,13 @@ def list(self, *, timeout: Timeout = 'short') -> ListOfVersions: result = self._list(timeout=timeout) return ListOfVersionsResponse.model_validate(result).data + def iterate(self, *, timeout: Timeout = 'short') -> Iterator[Version]: + """Iterate over the available Actor environment variables. + + There is no possibility to control the pagination on this endpoint. + """ + return iter(self.list(timeout=timeout).items) + def create( self, *, @@ -134,6 +145,8 @@ def __init__( async def list(self, *, timeout: Timeout = 'short') -> ListOfVersions: """List the available Actor versions. + The returned page also supports iteration: `async for item in client.list()` yields individual versions. + https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions Args: @@ -145,6 +158,14 @@ async def list(self, *, timeout: Timeout = 'short') -> ListOfVersions: result = await self._list(timeout=timeout) return ListOfVersionsResponse.model_validate(result).data + async def iterate(self, *, timeout: Timeout = 'short') -> AsyncIterator[Version]: + """Iterate over the available Actor environment variables. + + There is no possibility to control the pagination on this endpoint. + """ + for item in (await self.list(timeout=timeout)).items: + yield item + async def create( self, *, diff --git a/src/apify_client/_resource_clients/build_collection.py b/src/apify_client/_resource_clients/build_collection.py index a55ee6c2..ad90c4de 100644 --- a/src/apify_client/_resource_clients/build_collection.py +++ b/src/apify_client/_resource_clients/build_collection.py @@ -4,9 +4,13 @@ from apify_client._docs import docs_group from apify_client._models import ListOfBuilds, ListOfBuildsResponse +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + + from apify_client._models import BuildShort from apify_client._types import Timeout @@ -57,6 +61,37 @@ def list( result = self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfBuildsResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[BuildShort]: + """Iterate over all Actor builds. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actors/build-collection/get-list-of-builds + https://docs.apify.com/api/v2#/reference/actor-builds/build-collection/get-user-builds-list + + Args: + limit: How many builds to retrieve. + offset: What build to include as first when retrieving the list. + desc: Whether to sort the builds in descending order based on their start date. + timeout: Timeout for the API HTTP request. + + Yields: + The Actor builds matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfBuilds: + return self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator(_callback, limit=limit, offset=offset) + @docs_group('Resource clients') class BuildCollectionClientAsync(ResourceClientAsync): @@ -104,3 +139,34 @@ async def list( """ result = await self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfBuildsResponse.model_validate(result).data + + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[BuildShort]: + """Iterate over all Actor builds. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actors/build-collection/get-list-of-builds + https://docs.apify.com/api/v2#/reference/actor-builds/build-collection/get-user-builds-list + + Args: + limit: How many builds to retrieve. + offset: What build to include as first when retrieving the list. + desc: Whether to sort the builds in descending order based on their start date. + timeout: Timeout for the API HTTP request. + + Yields: + The Actor builds matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfBuilds: + return await self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) diff --git a/src/apify_client/_resource_clients/dataset.py b/src/apify_client/_resource_clients/dataset.py index 95855bde..16967735 100644 --- a/src/apify_client/_resource_clients/dataset.py +++ b/src/apify_client/_resource_clients/dataset.py @@ -8,6 +8,7 @@ from apify_client._docs import docs_group from apify_client._models import Dataset, DatasetResponse, DatasetStatistics, DatasetStatisticsResponse +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync from apify_client._utils import ( create_storage_content_signature, @@ -208,8 +209,9 @@ def list_items( items=items, total=int(response.headers['x-apify-pagination-total']), offset=int(response.headers['x-apify-pagination-offset']), - # x-apify-pagination-count returns invalid values when hidden/empty items are skipped - count=len(items), + # x-apify-pagination-count returns count of processed items, not count of returned items + # This makes difference when items were filtered using hidden/empty + count=max(int(response.headers['x-apify-pagination-count']), len(items)), # API returns 999999999999 when no limit is used limit=int(response.headers['x-apify-pagination-limit']), desc=response.headers['x-apify-pagination-desc'].lower() == 'true', @@ -218,7 +220,7 @@ def list_items( def iterate_items( self, *, - offset: int = 0, + offset: int | None = None, limit: int | None = None, clean: bool | None = None, desc: bool | None = None, @@ -228,10 +230,14 @@ def iterate_items( skip_empty: bool | None = None, skip_hidden: bool | None = None, signature: str | None = None, + chunk_size: int | None = None, timeout: Timeout = 'long', ) -> Iterator[dict]: """Iterate over the items in the dataset. + Simple `list_items` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items Args: @@ -259,29 +265,17 @@ def iterate_items( skip_hidden: If True, then hidden fields are skipped from the output, i.e. fields starting with the # character. signature: Signature used to access the items. + chunk_size: Maximum number of items requested per API call when iterating across pages. timeout: Timeout for the API HTTP request. Yields: An item from the dataset. """ - cache_size = 1000 - - should_finish = False - read_items = 0 - - # We can't rely on DatasetItemsPage.total because that is updated with a delay, - # so if you try to read the dataset items right after a run finishes, you could miss some. - # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: - effective_limit = cache_size - if limit is not None: - if read_items == limit: - break - effective_limit = min(cache_size, limit - read_items) - - current_items_page = self.list_items( - offset=offset + read_items, - limit=effective_limit, + + def _callback(*, limit: int | None = None, offset: int | None = None) -> DatasetItemsPage: + return self.list_items( + offset=offset, + limit=limit, clean=clean, desc=desc, fields=fields, @@ -293,13 +287,8 @@ def iterate_items( timeout=timeout, ) - yield from current_items_page.items - - current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count - - if current_page_item_count < cache_size: - should_finish = True + # Default chunk size of 1000 keeps backwards compatibility with the previous fixed cache size. + return get_items_iterator(_callback, limit=limit, offset=offset, chunk_size=chunk_size or 1000) def download_items( self, @@ -883,17 +872,18 @@ async def list_items( items=items, total=int(response.headers['x-apify-pagination-total']), offset=int(response.headers['x-apify-pagination-offset']), - # x-apify-pagination-count returns invalid values when hidden/empty items are skipped - count=len(items), + # x-apify-pagination-count returns count of processed items, not count of returned items + # This makes difference when items were filtered using hidden/empty + count=max(int(response.headers['x-apify-pagination-count']), len(items)), # API returns 999999999999 when no limit is used limit=int(response.headers['x-apify-pagination-limit']), desc=response.headers['x-apify-pagination-desc'].lower() == 'true', ) - async def iterate_items( + def iterate_items( self, *, - offset: int = 0, + offset: int | None = None, limit: int | None = None, clean: bool | None = None, desc: bool | None = None, @@ -903,10 +893,14 @@ async def iterate_items( skip_empty: bool | None = None, skip_hidden: bool | None = None, signature: str | None = None, + chunk_size: int | None = None, timeout: Timeout = 'long', ) -> AsyncIterator[dict]: """Iterate over the items in the dataset. + Simple `list_items` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items Args: @@ -934,29 +928,17 @@ async def iterate_items( skip_hidden: If True, then hidden fields are skipped from the output, i.e. fields starting with the # character. signature: Signature used to access the items. + chunk_size: Maximum number of items requested per API call when iterating across pages. timeout: Timeout for the API HTTP request. Yields: An item from the dataset. """ - cache_size = 1000 - - should_finish = False - read_items = 0 - - # We can't rely on DatasetItemsPage.total because that is updated with a delay, - # so if you try to read the dataset items right after a run finishes, you could miss some. - # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: - effective_limit = cache_size - if limit is not None: - if read_items == limit: - break - effective_limit = min(cache_size, limit - read_items) - - current_items_page = await self.list_items( - offset=offset + read_items, - limit=effective_limit, + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> DatasetItemsPage: + return await self.list_items( + offset=offset, + limit=limit, clean=clean, desc=desc, fields=fields, @@ -968,14 +950,7 @@ async def iterate_items( timeout=timeout, ) - for item in current_items_page.items: - yield item - - current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count - - if current_page_item_count < cache_size: - should_finish = True + return get_items_iterator_async(_callback, limit=limit, offset=offset, chunk_size=chunk_size or 1000) async def get_items_as_bytes( self, diff --git a/src/apify_client/_resource_clients/dataset_collection.py b/src/apify_client/_resource_clients/dataset_collection.py index d004e7c8..c194bd80 100644 --- a/src/apify_client/_resource_clients/dataset_collection.py +++ b/src/apify_client/_resource_clients/dataset_collection.py @@ -9,10 +9,14 @@ ListOfDatasets, ListOfDatasetsResponse, ) +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + from apify_client._literals import StorageOwnership + from apify_client._models import DatasetListItem from apify_client._types import Timeout @@ -71,6 +75,43 @@ def list( ) return ListOfDatasetsResponse.model_validate(result).data + def iterate( + self, + *, + unnamed: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + ownership: StorageOwnership | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[DatasetListItem]: + """Iterate over the available datasets. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/datasets/dataset-collection/get-list-of-datasets + + Args: + unnamed: Whether to include unnamed datasets in the list. + limit: How many datasets to retrieve. + offset: What dataset to include as first when retrieving the list. + desc: Whether to sort the datasets in descending order based on their modification date. + ownership: Filter by ownership. 'ownedByMe' returns only user's own datasets, + 'sharedWithMe' returns only datasets shared with the user. + timeout: Timeout for the API HTTP request. + + Yields: + The available datasets matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfDatasets: + return self.list( + unnamed=unnamed, limit=limit, offset=offset, desc=desc, ownership=ownership, timeout=timeout + ) + + return get_items_iterator(_callback, limit=limit, offset=offset) + def get_or_create( self, *, @@ -149,6 +190,43 @@ async def list( ) return ListOfDatasetsResponse.model_validate(result).data + def iterate( + self, + *, + unnamed: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + ownership: StorageOwnership | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[DatasetListItem]: + """Iterate over the available datasets. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/datasets/dataset-collection/get-list-of-datasets + + Args: + unnamed: Whether to include unnamed datasets in the list. + limit: How many datasets to retrieve. + offset: What dataset to include as first when retrieving the list. + desc: Whether to sort the datasets in descending order based on their modification date. + ownership: Filter by ownership. 'ownedByMe' returns only user's own datasets, + 'sharedWithMe' returns only datasets shared with the user. + timeout: Timeout for the API HTTP request. + + Yields: + The available datasets matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfDatasets: + return await self.list( + unnamed=unnamed, limit=limit, offset=offset, desc=desc, ownership=ownership, timeout=timeout + ) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) + async def get_or_create( self, *, diff --git a/src/apify_client/_resource_clients/key_value_store.py b/src/apify_client/_resource_clients/key_value_store.py index 3ec8b8f7..e8dcc4d3 100644 --- a/src/apify_client/_resource_clients/key_value_store.py +++ b/src/apify_client/_resource_clients/key_value_store.py @@ -14,6 +14,7 @@ ListOfKeys, ListOfKeysResponse, ) +from apify_client._pagination import get_cursor_iterator, get_cursor_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync from apify_client._utils import ( catch_not_found_or_throw, @@ -183,53 +184,49 @@ def iterate_keys( self, *, limit: int | None = None, + exclusive_start_key: str | None = None, collection: str | None = None, prefix: str | None = None, signature: str | None = None, + chunk_size: int | None = None, timeout: Timeout = 'long', ) -> Iterator[KeyValueStoreKey]: """Iterate over the keys in the key-value store. + Simple `list_keys` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + https://docs.apify.com/api/v2#/reference/key-value-stores/key-collection/get-list-of-keys Args: limit: Maximum number of keys to return. By default there is no limit. + exclusive_start_key: All keys up to this one (including) are skipped from the result. collection: The name of the collection in store schema to list keys from. prefix: The prefix of the keys to be listed. signature: Signature used to access the items. + chunk_size: Maximum number of keys requested per API call when iterating across pages. timeout: Timeout for the API HTTP request. Yields: A key from the key-value store. """ - cache_size = 1000 - read_keys = 0 - exclusive_start_key: str | None = None - - while True: - effective_limit = cache_size - if limit is not None: - if read_keys == limit: - break - effective_limit = min(cache_size, limit - read_keys) - - current_keys_page = self.list_keys( - limit=effective_limit, - exclusive_start_key=exclusive_start_key, + + def _callback(*, cursor: str | None = None, limit: int | None = None) -> ListOfKeys: + return self.list_keys( + limit=limit, + exclusive_start_key=cursor, collection=collection, prefix=prefix, signature=signature, timeout=timeout, ) - yield from current_keys_page.items - - read_keys += len(current_keys_page.items) - - if not current_keys_page.is_truncated: - break - - exclusive_start_key = current_keys_page.next_exclusive_start_key + return get_cursor_iterator( + _callback, + cursor=exclusive_start_key, + limit=limit, + chunk_size=chunk_size or 1000, + ) def get_record(self, key: str, *, signature: str | None = None, timeout: Timeout = 'long') -> dict | None: """Retrieve the given record from the key-value store. @@ -609,58 +606,53 @@ async def list_keys( result = response_to_dict(response) return ListOfKeysResponse.model_validate(result).data - async def iterate_keys( + def iterate_keys( self, *, limit: int | None = None, + exclusive_start_key: str | None = None, collection: str | None = None, prefix: str | None = None, signature: str | None = None, + chunk_size: int | None = None, timeout: Timeout = 'long', ) -> AsyncIterator[KeyValueStoreKey]: """Iterate over the keys in the key-value store. + Simple `list_keys` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + https://docs.apify.com/api/v2#/reference/key-value-stores/key-collection/get-list-of-keys Args: limit: Maximum number of keys to return. By default there is no limit. + exclusive_start_key: All keys up to this one (including) are skipped from the result. collection: The name of the collection in store schema to list keys from. prefix: The prefix of the keys to be listed. signature: Signature used to access the items. + chunk_size: Maximum number of keys requested per API call when iterating across pages. timeout: Timeout for the API HTTP request. Yields: A key from the key-value store. """ - cache_size = 1000 - read_keys = 0 - exclusive_start_key: str | None = None - - while True: - effective_limit = cache_size - if limit is not None: - if read_keys == limit: - break - effective_limit = min(cache_size, limit - read_keys) - - current_keys_page = await self.list_keys( - limit=effective_limit, - exclusive_start_key=exclusive_start_key, + + async def _callback(*, cursor: str | None = None, limit: int | None = None) -> ListOfKeys: + return await self.list_keys( + limit=limit, + exclusive_start_key=cursor, collection=collection, prefix=prefix, signature=signature, timeout=timeout, ) - for key in current_keys_page.items: - yield key - - read_keys += len(current_keys_page.items) - - if not current_keys_page.is_truncated: - break - - exclusive_start_key = current_keys_page.next_exclusive_start_key + return get_cursor_iterator_async( + _callback, + cursor=exclusive_start_key, + limit=limit, + chunk_size=chunk_size or 1000, + ) async def get_record(self, key: str, *, signature: str | None = None, timeout: Timeout = 'long') -> dict | None: """Retrieve the given record from the key-value store. diff --git a/src/apify_client/_resource_clients/key_value_store_collection.py b/src/apify_client/_resource_clients/key_value_store_collection.py index 6c7add4e..b4bf506d 100644 --- a/src/apify_client/_resource_clients/key_value_store_collection.py +++ b/src/apify_client/_resource_clients/key_value_store_collection.py @@ -9,9 +9,12 @@ ListOfKeyValueStores, ListOfKeyValueStoresResponse, ) +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + from apify_client._literals import StorageOwnership from apify_client._types import Timeout @@ -71,6 +74,43 @@ def list( ) return ListOfKeyValueStoresResponse.model_validate(result).data + def iterate( + self, + *, + unnamed: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + ownership: StorageOwnership | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[KeyValueStore]: + """Iterate over the available key-value stores. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/key-value-stores/store-collection/get-list-of-key-value-stores + + Args: + unnamed: Whether to include unnamed key-value stores in the list. + limit: How many key-value stores to retrieve. + offset: What key-value store to include as first when retrieving the list. + desc: Whether to sort the key-value stores in descending order based on their modification date. + ownership: Filter by ownership. 'ownedByMe' returns only user's own key-value stores, + 'sharedWithMe' returns only key-value stores shared with the user. + timeout: Timeout for the API HTTP request. + + Yields: + The available key-value stores matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfKeyValueStores: + return self.list( + unnamed=unnamed, limit=limit, offset=offset, desc=desc, ownership=ownership, timeout=timeout + ) + + return get_items_iterator(_callback, limit=limit, offset=offset) + def get_or_create( self, *, @@ -149,6 +189,43 @@ async def list( ) return ListOfKeyValueStoresResponse.model_validate(result).data + def iterate( + self, + *, + unnamed: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + ownership: StorageOwnership | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[KeyValueStore]: + """Iterate over the available key-value stores. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/key-value-stores/store-collection/get-list-of-key-value-stores + + Args: + unnamed: Whether to include unnamed key-value stores in the list. + limit: How many key-value stores to retrieve. + offset: What key-value store to include as first when retrieving the list. + desc: Whether to sort the key-value stores in descending order based on their modification date. + ownership: Filter by ownership. 'ownedByMe' returns only user's own key-value stores, + 'sharedWithMe' returns only key-value stores shared with the user. + timeout: Timeout for the API HTTP request. + + Yields: + The available key-value stores matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfKeyValueStores: + return await self.list( + unnamed=unnamed, limit=limit, offset=offset, desc=desc, ownership=ownership, timeout=timeout + ) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) + async def get_or_create( self, *, diff --git a/src/apify_client/_resource_clients/request_queue.py b/src/apify_client/_resource_clients/request_queue.py index c866944d..de6f7b6a 100644 --- a/src/apify_client/_resource_clients/request_queue.py +++ b/src/apify_client/_resource_clients/request_queue.py @@ -35,11 +35,13 @@ UnlockRequestsResponse, UnlockRequestsResult, ) +from apify_client._pagination import get_cursor_iterator, get_cursor_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync from apify_client._utils import catch_not_found_or_throw, response_to_dict, to_seconds from apify_client.errors import ApifyApiError if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator from datetime import timedelta from apify_client._literals import GeneralAccess @@ -546,6 +548,44 @@ def list_requests( result = response_to_dict(response) return ListOfRequestsResponse.model_validate(result).data + def iterate_requests( + self, + *, + limit: int | None = None, + filter: list[Literal['pending', 'locked']] | None = None, # noqa: A002 + cursor: str | None = None, + chunk_size: int | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[Request]: + """Iterate over requests in the queue. + + Simple `list_requests` does only one API call, possibly not listing all items matching the criteria. + This method returns an iterator that is capable of making multiple API calls to retrieve all items + matching the criteria using the opaque ``cursor`` returned by the API. + + https://docs.apify.com/api/v2#/reference/request-queues/request-collection/list-requests + + Args: + limit: Maximum number of requests to yield across all pages. + filter: List of request states to use as a filter. Multiple values mean union of the given filters. + cursor: A token returned in a previous API response, used as the initial pagination cursor. + chunk_size: Maximum number of requests requested per API call when iterating across pages. + timeout: Timeout for the API HTTP request. + + Yields: + A request from the queue. + """ + + def _callback(*, cursor: str | None = None, limit: int | None = None) -> ListOfRequests: + return self.list_requests(limit=limit, filter=filter, cursor=cursor, timeout=timeout) + + return get_cursor_iterator( + _callback, + cursor=cursor, + limit=limit, + chunk_size=chunk_size, + ) + def unlock_requests(self: RequestQueueClient, *, timeout: Timeout = 'long') -> UnlockRequestsResult: """Unlock all requests in the queue, which were locked by the same clientKey or from the same Actor run. @@ -1120,6 +1160,44 @@ async def list_requests( result = response_to_dict(response) return ListOfRequestsResponse.model_validate(result).data + def iterate_requests( + self, + *, + limit: int | None = None, + filter: list[Literal['pending', 'locked']] | None = None, # noqa: A002 + cursor: str | None = None, + chunk_size: int | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[Request]: + """Iterate over requests in the queue. + + Simple `list_requests` does only one API call, possibly not listing all items matching the criteria. + This method returns an iterator that is capable of making multiple API calls to retrieve all items + matching the criteria using the opaque ``cursor`` returned by the API. + + https://docs.apify.com/api/v2#/reference/request-queues/request-collection/list-requests + + Args: + limit: Maximum number of requests to yield across all pages. + filter: List of request states to use as a filter. Multiple values mean union of the given filters. + cursor: A token returned in a previous API response, used as the initial pagination cursor. + chunk_size: Maximum number of requests requested per API call when iterating across pages. + timeout: Timeout for the API HTTP request. + + Yields: + A request from the queue. + """ + + async def _callback(*, cursor: str | None = None, limit: int | None = None) -> ListOfRequests: + return await self.list_requests(limit=limit, filter=filter, cursor=cursor, timeout=timeout) + + return get_cursor_iterator_async( + _callback, + cursor=cursor, + limit=limit, + chunk_size=chunk_size, + ) + async def unlock_requests( self: RequestQueueClientAsync, *, diff --git a/src/apify_client/_resource_clients/request_queue_collection.py b/src/apify_client/_resource_clients/request_queue_collection.py index ffd3aa2b..3239e30e 100644 --- a/src/apify_client/_resource_clients/request_queue_collection.py +++ b/src/apify_client/_resource_clients/request_queue_collection.py @@ -9,10 +9,14 @@ RequestQueue, RequestQueueResponse, ) +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + from apify_client._literals import StorageOwnership + from apify_client._models import RequestQueueShort from apify_client._types import Timeout @@ -71,6 +75,43 @@ def list( ) return ListOfRequestQueuesResponse.model_validate(result).data + def iterate( + self, + *, + unnamed: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + ownership: StorageOwnership | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[RequestQueueShort]: + """Iterate over the available request queues. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/request-queues/queue-collection/get-list-of-request-queues + + Args: + unnamed: Whether to include unnamed request queues in the list. + limit: How many request queues to retrieve. + offset: What request queue to include as first when retrieving the list. + desc: Whether to sort the request queues in descending order based on their modification date. + ownership: Filter by ownership. 'ownedByMe' returns only user's own request queues, + 'sharedWithMe' returns only request queues shared with the user. + timeout: Timeout for the API HTTP request. + + Yields: + The available request queues matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfRequestQueues: + return self.list( + unnamed=unnamed, limit=limit, offset=offset, desc=desc, ownership=ownership, timeout=timeout + ) + + return get_items_iterator(_callback, limit=limit, offset=offset) + def get_or_create( self, *, @@ -147,6 +188,43 @@ async def list( ) return ListOfRequestQueuesResponse.model_validate(result).data + def iterate( + self, + *, + unnamed: bool | None = None, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + ownership: StorageOwnership | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[RequestQueueShort]: + """Iterate over the available request queues. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/request-queues/queue-collection/get-list-of-request-queues + + Args: + unnamed: Whether to include unnamed request queues in the list. + limit: How many request queues to retrieve. + offset: What request queue to include as first when retrieving the list. + desc: Whether to sort the request queues in descending order based on their modification date. + ownership: Filter by ownership. 'ownedByMe' returns only user's own request queues, + 'sharedWithMe' returns only request queues shared with the user. + timeout: Timeout for the API HTTP request. + + Yields: + The available request queues matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfRequestQueues: + return await self.list( + unnamed=unnamed, limit=limit, offset=offset, desc=desc, ownership=ownership, timeout=timeout + ) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) + async def get_or_create( self, *, diff --git a/src/apify_client/_resource_clients/run_collection.py b/src/apify_client/_resource_clients/run_collection.py index db2c2180..21b7d186 100644 --- a/src/apify_client/_resource_clients/run_collection.py +++ b/src/apify_client/_resource_clients/run_collection.py @@ -4,12 +4,15 @@ from apify_client._docs import docs_group from apify_client._models import ListOfRuns, ListOfRunsResponse +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator from datetime import datetime from apify_client._literals import ActorJobStatus + from apify_client._models import RunShort from apify_client._types import Timeout @@ -76,6 +79,51 @@ def list( ) return ListOfRunsResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + status: ActorJobStatus | list[ActorJobStatus] | None = None, # ty: ignore[invalid-type-form] + started_before: str | datetime | None = None, + started_after: str | datetime | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[RunShort]: + """Iterate over all Actor runs. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actors/run-collection/get-list-of-runs + https://docs.apify.com/api/v2#/reference/actor-runs/run-collection/get-user-runs-list + + Args: + limit: How many runs to retrieve. + offset: What run to include as first when retrieving the list. + desc: Whether to sort the runs in descending order based on their start date. + status: Retrieve only runs with the provided statuses. + started_before: Only return runs started before this date (inclusive). + started_after: Only return runs started after this date (inclusive). + timeout: Timeout for the API HTTP request. + + Yields: + The Actor runs matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfRuns: + return self.list( + limit=limit, + offset=offset, + desc=desc, + status=status, + started_before=started_before, + started_after=started_after, + timeout=timeout, + ) + + return get_items_iterator(_callback, limit=limit, offset=offset) + @docs_group('Resource clients') class RunCollectionClientAsync(ResourceClientAsync): @@ -139,3 +187,48 @@ async def list( startedAfter=started_after, ) return ListOfRunsResponse.model_validate(result).data + + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + status: ActorJobStatus | list[ActorJobStatus] | None = None, # ty: ignore[invalid-type-form] + started_before: str | datetime | None = None, + started_after: str | datetime | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[RunShort]: + """Iterate over all Actor runs. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actors/run-collection/get-list-of-runs + https://docs.apify.com/api/v2#/reference/actor-runs/run-collection/get-user-runs-list + + Args: + limit: How many runs to retrieve. + offset: What run to include as first when retrieving the list. + desc: Whether to sort the runs in descending order based on their start date. + status: Retrieve only runs with the provided statuses. + started_before: Only return runs started before this date (inclusive). + started_after: Only return runs started after this date (inclusive). + timeout: Timeout for the API HTTP request. + + Yields: + The Actor runs matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfRuns: + return await self.list( + limit=limit, + offset=offset, + desc=desc, + status=status, + started_before=started_before, + started_after=started_after, + timeout=timeout, + ) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) diff --git a/src/apify_client/_resource_clients/schedule_collection.py b/src/apify_client/_resource_clients/schedule_collection.py index 94725724..138246e5 100644 --- a/src/apify_client/_resource_clients/schedule_collection.py +++ b/src/apify_client/_resource_clients/schedule_collection.py @@ -10,9 +10,13 @@ ScheduleCreate, ScheduleResponse, ) +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + + from apify_client._models import ScheduleShort from apify_client._types import Timeout @@ -59,6 +63,36 @@ def list( result = self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfSchedulesResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[ScheduleShort]: + """Iterate over the available schedules. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/schedules/schedules-collection/get-list-of-schedules + + Args: + limit: How many schedules to retrieve. + offset: What schedules to include as first when retrieving the list. + desc: Whether to sort the schedules in descending order based on their modification date. + timeout: Timeout for the API HTTP request. + + Yields: + The available schedules matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfSchedules: + return self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator(_callback, limit=limit, offset=offset) + def create( self, *, @@ -152,6 +186,36 @@ async def list( result = await self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfSchedulesResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[ScheduleShort]: + """Iterate over the available schedules. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/schedules/schedules-collection/get-list-of-schedules + + Args: + limit: How many schedules to retrieve. + offset: What schedules to include as first when retrieving the list. + desc: Whether to sort the schedules in descending order based on their modification date. + timeout: Timeout for the API HTTP request. + + Yields: + The available schedules matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfSchedules: + return await self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) + async def create( self, *, diff --git a/src/apify_client/_resource_clients/store_collection.py b/src/apify_client/_resource_clients/store_collection.py index ca6b0921..f1990ee1 100644 --- a/src/apify_client/_resource_clients/store_collection.py +++ b/src/apify_client/_resource_clients/store_collection.py @@ -4,9 +4,13 @@ from apify_client._docs import docs_group from apify_client._models import ListOfActorsInStoreResponse, ListOfStoreActors +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + + from apify_client._models import StoreListActor from apify_client._types import Timeout @@ -71,6 +75,54 @@ def list( ) return ListOfActorsInStoreResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + search: str | None = None, + sort_by: str | None = None, + category: str | None = None, + username: str | None = None, + pricing_model: str | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[StoreListActor]: + """Iterate over Actors in Apify store. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2/#/reference/store/store-actors-collection/get-list-of-actors-in-store + + Args: + limit: How many Actors to list. + offset: What Actor to include as first when retrieving the list. + search: String to search by. The search runs on the following fields: title, name, description, username, + readme. + sort_by: Specifies the field by which to sort the results. + category: Filter by this category. + username: Filter by this username. + pricing_model: Filter by this pricing model. + timeout: Timeout for the API HTTP request. + + Yields: + The Actors in the store matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfStoreActors: + return self.list( + limit=limit, + offset=offset, + search=search, + sort_by=sort_by, + category=category, + username=username, + pricing_model=pricing_model, + timeout=timeout, + ) + + return get_items_iterator(_callback, limit=limit, offset=offset) + @docs_group('Resource clients') class StoreCollectionClientAsync(ResourceClientAsync): @@ -132,3 +184,51 @@ async def list( pricingModel=pricing_model, ) return ListOfActorsInStoreResponse.model_validate(result).data + + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + search: str | None = None, + sort_by: str | None = None, + category: str | None = None, + username: str | None = None, + pricing_model: str | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[StoreListActor]: + """Iterate over Actors in Apify store. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2/#/reference/store/store-actors-collection/get-list-of-actors-in-store + + Args: + limit: How many Actors to list. + offset: What Actor to include as first when retrieving the list. + search: String to search by. The search runs on the following fields: title, name, description, username, + readme. + sort_by: Specifies the field by which to sort the results. + category: Filter by this category. + username: Filter by this username. + pricing_model: Filter by this pricing model. + timeout: Timeout for the API HTTP request. + + Yields: + The Actors in the store matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfStoreActors: + return await self.list( + limit=limit, + offset=offset, + search=search, + sort_by=sort_by, + category=category, + username=username, + pricing_model=pricing_model, + timeout=timeout, + ) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) diff --git a/src/apify_client/_resource_clients/task_collection.py b/src/apify_client/_resource_clients/task_collection.py index a1f1a15c..04b6223d 100644 --- a/src/apify_client/_resource_clients/task_collection.py +++ b/src/apify_client/_resource_clients/task_collection.py @@ -13,12 +13,15 @@ TaskOptions, TaskResponse, ) +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync from apify_client._utils import to_seconds if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator from datetime import timedelta + from apify_client._models import TaskShort from apify_client._typeddicts import TaskInputDict from apify_client._types import Timeout @@ -66,6 +69,36 @@ def list( result = self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfTasksResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[TaskShort]: + """Iterate over the available tasks. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks + + Args: + limit: How many tasks to list. + offset: What task to include as first when retrieving the list. + desc: Whether to sort the tasks in descending order based on their creation date. + timeout: Timeout for the API HTTP request. + + Yields: + The available tasks matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfTasks: + return self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator(_callback, limit=limit, offset=offset) + def create( self, *, @@ -187,6 +220,36 @@ async def list( result = await self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfTasksResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[TaskShort]: + """Iterate over the available tasks. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks + + Args: + limit: How many tasks to list. + offset: What task to include as first when retrieving the list. + desc: Whether to sort the tasks in descending order based on their creation date. + timeout: Timeout for the API HTTP request. + + Yields: + The available tasks matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfTasks: + return await self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) + async def create( self, *, diff --git a/src/apify_client/_resource_clients/webhook_collection.py b/src/apify_client/_resource_clients/webhook_collection.py index b8bd71bb..4c7b3cf7 100644 --- a/src/apify_client/_resource_clients/webhook_collection.py +++ b/src/apify_client/_resource_clients/webhook_collection.py @@ -10,11 +10,14 @@ WebhookCreate, WebhookResponse, ) +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + from apify_client._literals import WebhookEventType - from apify_client._models import Webhook + from apify_client._models import Webhook, WebhookShort from apify_client._types import Timeout @@ -61,6 +64,36 @@ def list( result = self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfWebhooksResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[WebhookShort]: + """Iterate over the available webhooks. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks + + Args: + limit: How many webhooks to retrieve. + offset: What webhook to include as first when retrieving the list. + desc: Whether to sort the webhooks in descending order based on their date of creation. + timeout: Timeout for the API HTTP request. + + Yields: + The available webhooks matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfWebhooks: + return self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator(_callback, limit=limit, offset=offset) + def create( self, *, @@ -164,6 +197,36 @@ async def list( result = await self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfWebhooksResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[WebhookShort]: + """Iterate over the available webhooks. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks + + Args: + limit: How many webhooks to retrieve. + offset: What webhook to include as first when retrieving the list. + desc: Whether to sort the webhooks in descending order based on their date of creation. + timeout: Timeout for the API HTTP request. + + Yields: + The available webhooks matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfWebhooks: + return await self.list(limit=limit, offset=offset, desc=desc, timeout=timeout) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) + async def create( self, *, diff --git a/src/apify_client/_resource_clients/webhook_dispatch_collection.py b/src/apify_client/_resource_clients/webhook_dispatch_collection.py index d573a1ca..1d71c7cf 100644 --- a/src/apify_client/_resource_clients/webhook_dispatch_collection.py +++ b/src/apify_client/_resource_clients/webhook_dispatch_collection.py @@ -1,12 +1,16 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from apify_client._docs import docs_group from apify_client._models import ListOfWebhookDispatches, ListOfWebhookDispatchesResponse +from apify_client._pagination import get_items_iterator, get_items_iterator_async from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterator + + from apify_client._models import WebhookDispatch from apify_client._types import Timeout @@ -53,6 +57,36 @@ def list( result = self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfWebhookDispatchesResponse.model_validate(result).data + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> Iterator[WebhookDispatch]: + """Iterate over all webhook dispatches of a user. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/webhook-dispatches/webhook-dispatches-collection/get-list-of-webhook-dispatches + + Args: + limit: How many webhook dispatches to retrieve. + offset: What webhook dispatch to include as first when retrieving the list. + desc: Whether to sort the webhook dispatches in descending order based on the date of their creation. + timeout: Timeout for the API HTTP request. + + Yields: + The webhook dispatches of a user matching the specified filters. + """ + + def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfWebhookDispatches: + return cast('ListOfWebhookDispatches', self.list(limit=limit, offset=offset, desc=desc, timeout=timeout)) + + return get_items_iterator(_callback, limit=limit, offset=offset) + @docs_group('Resource clients') class WebhookDispatchCollectionClientAsync(ResourceClientAsync): @@ -96,3 +130,36 @@ async def list( """ result = await self._list(timeout=timeout, limit=limit, offset=offset, desc=desc) return ListOfWebhookDispatchesResponse.model_validate(result).data + + def iterate( + self, + *, + limit: int | None = None, + offset: int | None = None, + desc: bool | None = None, + timeout: Timeout = 'medium', + ) -> AsyncIterator[WebhookDispatch]: + """Iterate over all webhook dispatches of a user. + + Simple `list` does only one API call, possibly not listing all items matching the criteria. This method + returns an iterator that is capable of making multiple API calls to retrieve all items matching the criteria. + + https://docs.apify.com/api/v2#/reference/webhook-dispatches/webhook-dispatches-collection/get-list-of-webhook-dispatches + + Args: + limit: How many webhook dispatches to retrieve. + offset: What webhook dispatch to include as first when retrieving the list. + desc: Whether to sort the webhook dispatches in descending order based on the date of their creation. + timeout: Timeout for the API HTTP request. + + Yields: + The webhook dispatches of a user matching the specified filters. + """ + + async def _callback(*, limit: int | None = None, offset: int | None = None) -> ListOfWebhookDispatches: + return cast( + 'ListOfWebhookDispatches', + await self.list(limit=limit, offset=offset, desc=desc, timeout=timeout), + ) + + return get_items_iterator_async(_callback, limit=limit, offset=offset) diff --git a/tests/unit/test_client_pagination.py b/tests/unit/test_client_pagination.py new file mode 100644 index 00000000..537b0e33 --- /dev/null +++ b/tests/unit/test_client_pagination.py @@ -0,0 +1,640 @@ +from __future__ import annotations + +import dataclasses +import json +import re +from typing import TYPE_CHECKING, Any, Literal, TypeAlias + +import pytest +from pydantic.fields import FieldInfo +from werkzeug import Response + +from apify_client import ApifyClient, ApifyClientAsync +from apify_client import _models as _models_module +from apify_client._resource_clients import ( + ActorCollectionClient, + ActorCollectionClientAsync, + ActorEnvVarCollectionClient, + ActorEnvVarCollectionClientAsync, + ActorVersionCollectionClient, + ActorVersionCollectionClientAsync, + BuildCollectionClient, + BuildCollectionClientAsync, + DatasetClient, + DatasetClientAsync, + DatasetCollectionClient, + DatasetCollectionClientAsync, + KeyValueStoreClient, + KeyValueStoreClientAsync, + KeyValueStoreCollectionClient, + KeyValueStoreCollectionClientAsync, + RequestQueueClient, + RequestQueueClientAsync, + RequestQueueCollectionClient, + RequestQueueCollectionClientAsync, + RunCollectionClient, + RunCollectionClientAsync, + ScheduleCollectionClient, + ScheduleCollectionClientAsync, + StoreCollectionClient, + StoreCollectionClientAsync, + TaskCollectionClient, + TaskCollectionClientAsync, + WebhookCollectionClient, + WebhookCollectionClientAsync, + WebhookDispatchCollectionClient, + WebhookDispatchCollectionClientAsync, +) + +if TYPE_CHECKING: + from collections.abc import Callable + + from _pytest.mark import ParameterSet + from pydantic import BaseModel + from pytest_httpserver import HTTPServer + from werkzeug import Request + + +CollectionClient: TypeAlias = ( + ActorCollectionClient + | BuildCollectionClient + | RunCollectionClient + | ScheduleCollectionClient + | TaskCollectionClient + | WebhookCollectionClient + | WebhookDispatchCollectionClient + | DatasetCollectionClient + | KeyValueStoreCollectionClient + | RequestQueueCollectionClient + | StoreCollectionClient + | ActorEnvVarCollectionClient + | ActorVersionCollectionClient +) + +CollectionClientAsync: TypeAlias = ( + ActorCollectionClientAsync + | BuildCollectionClientAsync + | RunCollectionClientAsync + | ScheduleCollectionClientAsync + | TaskCollectionClientAsync + | WebhookCollectionClientAsync + | WebhookDispatchCollectionClientAsync + | DatasetCollectionClientAsync + | KeyValueStoreCollectionClientAsync + | RequestQueueCollectionClientAsync + | StoreCollectionClientAsync + | ActorEnvVarCollectionClientAsync + | ActorVersionCollectionClientAsync +) + +ID_PLACEHOLDER = 'some-id' + + +# Inner list models whose `items: list[]` is relaxed to `list[dict]`. +# Point of these tests is pagination mechanism, not internal object validation. +_RELAXED_LIST_MODELS = ( + 'ListOfActors', + 'ListOfBuilds', + 'ListOfDatasets', + 'ListOfEnvVars', + 'ListOfKeys', + 'ListOfKeyValueStores', + 'ListOfRequestQueues', + 'ListOfRequests', + 'ListOfRuns', + 'ListOfSchedules', + 'ListOfStoreActors', + 'ListOfTasks', + 'ListOfVersions', + 'ListOfWebhookDispatches', + 'ListOfWebhooks', +) + +# Outer wrappers that embed a relaxed list model via `.data`. Their compiled schema pins the +# inner's schema at construction time, so they need a forced rebuild to pick up the relaxation. +# The wrappers themselves are not mutated — their own field annotations stay as-is. +_REBUILT_RESPONSE_WRAPPERS = ( + 'ListOfActorsInStoreResponse', + 'ListOfActorsResponse', + 'ListOfBuildsResponse', + 'ListOfDatasetsResponse', + 'ListOfEnvVarsResponse', + 'ListOfKeyValueStoresResponse', + 'ListOfKeysResponse', + 'ListOfRequestQueuesResponse', + 'ListOfRequestsResponse', + 'ListOfRunsResponse', + 'ListOfSchedulesResponse', + 'ListOfTasksResponse', + 'ListOfVersionsResponse', + 'ListOfWebhooksResponse', +) + + +@pytest.fixture(autouse=True) +def _relax_item_validation() -> Any: + """Relax only the element type of `items` on paginated list models for the test run. + + Pagination tests feed synthetic `{'id': N}` items that don't satisfy the real API schemas + (`ActorShort`, `BuildShort`, `Request`, `EnvVar`, …). Instead of bypassing validation + wholesale, each inner `ListOf*` model has its `items` field swapped to `list[dict]` + and rebuilt. Outer `.data` wrapping and every pagination-metadata field remain validated. + """ + relaxed_field = FieldInfo.from_annotation(list[dict]) + originals: dict[type[BaseModel], FieldInfo] = {} + wrappers = [getattr(_models_module, name) for name in _REBUILT_RESPONSE_WRAPPERS] + + for name in _RELAXED_LIST_MODELS: + cls = getattr(_models_module, name) + originals[cls] = cls.model_fields['items'] + cls.model_fields['items'] = relaxed_field + cls.model_rebuild(force=True) + for wrapper in wrappers: + wrapper.model_rebuild(force=True) + try: + yield + finally: + for cls, field in originals.items(): + cls.model_fields['items'] = field + cls.model_rebuild(force=True) + for wrapper in wrappers: + wrapper.model_rebuild(force=True) + + +def create_items(start: int, end: int, step: int | None = None) -> list[dict[str, int]]: + """Create a list of test items for the given index range.""" + if not step: + step = -1 if end < start else 1 + return [{'id': i} for i in range(start, end, step)] + + +NORMAL_ITEMS = 2500 +EXTRA_ITEMS_UNNAMED = 100 +MAX_ITEMS_PER_PAGE = 1000 + + +def _is_true(value: str | None) -> bool: + """Match the `'true'` wire form produced by the client's bool→string serialization.""" + return value == 'true' + + +def _parse_int_param(value: str | None) -> int: + return int(value) if value not in (None, '') else 0 + + +def _handle_offset_pagination(request: Request) -> Response: + """Serve an offset-paginated Apify API response. + + The simulated platform holds 2500 items normally and an additional 100 when + ``unnamed=true`` is requested. Pages are capped at 1000 items regardless of the requested + limit, mirroring the real API. The dataset items endpoint returns items as a raw list; + all other endpoints wrap them in ``{'data': {...}}``. + """ + params = request.args + + total_items = (NORMAL_ITEMS + EXTRA_ITEMS_UNNAMED) if _is_true(params.get('unnamed')) else NORMAL_ITEMS + offset = _parse_int_param(params.get('offset')) + limit = _parse_int_param(params.get('limit')) + assert offset >= 0, 'Invalid offset sent to API' + assert limit >= 0, 'Invalid limit sent to API' + + desc = _is_true(params.get('desc')) + items = create_items(total_items, 0) if desc else create_items(0, total_items) + + lower_index = min(offset, total_items) + upper_index = min(offset + (limit or total_items), total_items) + count = min(max(upper_index - lower_index, 0), MAX_ITEMS_PER_PAGE) + selected_items = items[lower_index : min(upper_index, lower_index + MAX_ITEMS_PER_PAGE)] + + # Every second item is filtered out when `skipEmpty=true`, `skipHidden=true`, or `clean=true`. + if _is_true(params.get('skipEmpty')) or _is_true(params.get('skipHidden')) or _is_true(params.get('clean')): + selected_items = selected_items[::2] + + headers = { + 'x-apify-pagination-count': str(count), + 'x-apify-pagination-total': str(total_items), + 'x-apify-pagination-offset': str(offset), + 'x-apify-pagination-limit': str(limit or count or 1), + 'x-apify-pagination-desc': str(desc).lower(), + 'content-type': 'application/json', + } + + if request.path.endswith(f'/datasets/{ID_PLACEHOLDER}/items'): + body: Any = selected_items + else: + body = { + 'data': { + 'total': total_items, + 'count': count, + 'offset': offset, + 'limit': limit or (count or 1), + 'desc': desc, + 'items': selected_items, + } + } + return Response(status=200, headers=headers, response=json.dumps(body)) + + +def _handle_cursor_pagination(request: Request) -> Response: + """Serve a cursor-paginated Apify API response for KVS keys and RQ requests. + + Holds 2500 synthetic items whose integer `id` equals their position. Each page is capped + at 1000 items. KVS uses `exclusiveStartKey`; RQ accepts either the deprecated + `exclusiveStartId` on the initial call or the opaque `cursor` on subsequent calls. All + three values encode the last-seen item id as a string — the next page starts at id + 1. + """ + params = request.args + limit = _parse_int_param(params.get('limit')) + assert limit >= 0, 'Invalid limit sent to API' + + cursor_raw = params.get('exclusiveStartKey') or params.get('exclusiveStartId') or params.get('cursor') + + total_items = NORMAL_ITEMS + start = int(cursor_raw) + 1 if cursor_raw not in (None, '') else 0 + end = total_items if not limit else min(start + limit, total_items) + page_end = min(end, start + MAX_ITEMS_PER_PAGE) + selected_items = [{'id': i} for i in range(start, page_end)] + + if request.path.endswith('/keys'): + is_truncated = page_end < total_items and bool(selected_items) + next_exclusive_start_key = str(selected_items[-1]['id']) if selected_items and is_truncated else None + body: dict[str, Any] = { + 'data': { + 'items': selected_items, + 'count': len(selected_items), + 'limit': limit or (len(selected_items) or 1), + 'is_truncated': is_truncated, + 'next_exclusive_start_key': next_exclusive_start_key, + } + } + else: # `/requests` + has_more = page_end < total_items and bool(selected_items) + next_cursor = str(selected_items[-1]['id']) if has_more else None + body = { + 'data': { + 'items': selected_items, + 'count': len(selected_items), + 'limit': limit or (len(selected_items) or 1), + 'next_cursor': next_cursor, + } + } + return Response(status=200, headers={'content-type': 'application/json'}, response=json.dumps(body)) + + +def _pagination_handler(request: Request) -> Response: + """Dispatch between cursor-based (KVS keys, RQ requests) and offset-based endpoints.""" + if request.path.endswith(('/keys', '/requests')): + return _handle_cursor_pagination(request) + return _handle_offset_pagination(request) + + +@pytest.fixture +def pagination_server(httpserver: HTTPServer) -> HTTPServer: + """Register a catch-all handler that mirrors the Apify paginated endpoints.""" + httpserver.expect_request(re.compile(r'.*')).respond_with_handler(_pagination_handler) + return httpserver + + +def _make_sync_client(httpserver: HTTPServer) -> ApifyClient: + return ApifyClient(token='test', api_url=httpserver.url_for('/')) + + +def _make_async_client(httpserver: HTTPServer) -> ApifyClientAsync: + return ApifyClientAsync(token='test', api_url=httpserver.url_for('/')) + + +# Map resource-client class name to a factory that, given an `ApifyClient`/`ApifyClientAsync`, +# returns the sub-client under test. Usable for both sync and async since every accessor is +# available symmetrically on both root clients. +_CLIENT_FACTORIES: dict[str, Callable[[Any], Any]] = { + 'ActorCollectionClient': lambda c: c.actors(), + 'ScheduleCollectionClient': lambda c: c.schedules(), + 'TaskCollectionClient': lambda c: c.tasks(), + 'WebhookCollectionClient': lambda c: c.webhooks(), + 'WebhookDispatchCollectionClient': lambda c: c.webhook_dispatches(), + 'StoreCollectionClient': lambda c: c.store(), + 'DatasetCollectionClient': lambda c: c.datasets(), + 'KeyValueStoreCollectionClient': lambda c: c.key_value_stores(), + 'RequestQueueCollectionClient': lambda c: c.request_queues(), + 'BuildCollectionClient': lambda c: c.actor(ID_PLACEHOLDER).builds(), + 'RunCollectionClient': lambda c: c.actor(ID_PLACEHOLDER).runs(), + 'ActorVersionCollectionClient': lambda c: c.actor(ID_PLACEHOLDER).versions(), + 'ActorEnvVarCollectionClient': lambda c: c.actor(ID_PLACEHOLDER).version('some-version').env_vars(), + 'DatasetClient': lambda c: c.dataset(ID_PLACEHOLDER), + 'KeyValueStoreClient': lambda c: c.key_value_store(ID_PLACEHOLDER), + 'RequestQueueClient': lambda c: c.request_queue(ID_PLACEHOLDER), +} + + +_CLIENT_SET_NAMES: dict[Literal['collection', 'dataset', 'kvs', 'rq'], tuple[str, ...]] = { + # Tuple rather than set: pytest-xdist requires a stable iteration order across workers. + # https://pytest-xdist.readthedocs.io/en/stable/known-limitations.html#order-and-amount-of-test-must-be-consistent + 'collection': ( + 'ActorCollectionClient', + 'ScheduleCollectionClient', + 'TaskCollectionClient', + 'WebhookCollectionClient', + 'WebhookDispatchCollectionClient', + 'StoreCollectionClient', + 'DatasetCollectionClient', + 'KeyValueStoreCollectionClient', + 'RequestQueueCollectionClient', + 'BuildCollectionClient', + 'RunCollectionClient', + 'ActorVersionCollectionClient', + 'ActorEnvVarCollectionClient', + ), + 'dataset': ('DatasetClient',), + 'kvs': ('KeyValueStoreClient',), + 'rq': ('RequestQueueClient',), +} + + +@dataclasses.dataclass +class _PaginationCase: + """A single parametrized pagination test case.""" + + id: str + inputs: dict + expected_items: list[dict[str, int]] + supported_clients: set[str] + + def __hash__(self) -> int: + return hash(self.id) + + +COLLECTION_CLIENTS = { + 'ActorCollectionClient', + 'BuildCollectionClient', + 'RunCollectionClient', + 'ScheduleCollectionClient', + 'TaskCollectionClient', + 'WebhookCollectionClient', + 'WebhookDispatchCollectionClient', + 'DatasetCollectionClient', + 'KeyValueStoreCollectionClient', + 'RequestQueueCollectionClient', + 'StoreCollectionClient', +} + +NO_OPTIONS_CLIENTS = { + 'ActorEnvVarCollectionClient', + 'ActorVersionCollectionClient', +} + +DATASET_CLIENTS = {'DatasetClient'} +RQ_CLIENTS = {'RequestQueueClient'} +KVS_CLIENTS = {'KeyValueStoreClient'} +STORAGE_CLIENTS = DATASET_CLIENTS | RQ_CLIENTS | KVS_CLIENTS +OPTIONS_CLIENTS = COLLECTION_CLIENTS | STORAGE_CLIENTS + +TEST_CASES = ( + _PaginationCase('No options normal', {}, create_items(0, 2500), OPTIONS_CLIENTS), + # These clients can't iterate over all items if there is more of them than the API limit as they offer no + # pagination parameters. + _PaginationCase('No options limited', {}, create_items(0, 1000), NO_OPTIONS_CLIENTS), + _PaginationCase('Limit', {'limit': 1100}, create_items(0, 1100), OPTIONS_CLIENTS), + _PaginationCase('Out of range limit', {'limit': 3000}, create_items(0, 2500), OPTIONS_CLIENTS), + _PaginationCase( + 'Offset', + {'offset': 1000}, + create_items(1000, 2500), + OPTIONS_CLIENTS - KVS_CLIENTS - RQ_CLIENTS, + ), + _PaginationCase( + 'Offset and limit', + {'offset': 1000, 'limit': 1100}, + create_items(1000, 2100), + OPTIONS_CLIENTS - KVS_CLIENTS - RQ_CLIENTS, + ), + _PaginationCase('Out of range offset', {'offset': 3000}, [], OPTIONS_CLIENTS - KVS_CLIENTS - RQ_CLIENTS), + _PaginationCase( + 'Offset, limit, descending', + {'offset': 1000, 'limit': 1100, 'desc': True}, + create_items(1500, 400), + OPTIONS_CLIENTS - {'StoreCollectionClient'} - KVS_CLIENTS - RQ_CLIENTS, + ), + _PaginationCase( + 'Offset, limit, descending, unnamed', + {'offset': 50, 'limit': 1100, 'desc': True, 'unnamed': True}, + create_items(2550, 1450), + {'DatasetCollectionClient', 'KeyValueStoreCollectionClient', 'RequestQueueCollectionClient'}, + ), + _PaginationCase( + 'chunk_size', + {'chunk_size': 100, 'limit': 250}, + create_items(0, 250), + STORAGE_CLIENTS, + ), + _PaginationCase( + 'Offset, limit, descending, chunk_size', + {'offset': 50, 'limit': 1100, 'desc': True, 'chunk_size': 100}, + create_items(2450, 1350), + DATASET_CLIENTS, + ), + _PaginationCase( + 'Offset, limit, descending, chunk_size, clean', + {'limit': 1500, 'chunk_size': 100, 'clean': True}, + # API behavior with `clean=True` is to apply the cleaning after pagination, so we end up with missing items + # being counted towards the limit and thus fewer total items returned. + create_items(0, 1500, 2), + DATASET_CLIENTS, + ), + _PaginationCase( + 'Exclusive start key', + {'exclusive_start_key': '1000'}, + create_items(1001, 2500), + KVS_CLIENTS, + ), + _PaginationCase( + 'Exclusive start key and limit', + {'exclusive_start_key': '1000', 'limit': 500}, + create_items(1001, 1501), + KVS_CLIENTS, + ), + _PaginationCase( + 'Cursor', + {'cursor': '1000'}, + create_items(1001, 2500), + RQ_CLIENTS, + ), + _PaginationCase( + 'Cursor and limit', + {'cursor': '1000', 'limit': 500}, + create_items(1001, 1501), + RQ_CLIENTS, + ), +) + + +def _generate_test_params(client_set: Literal['collection', 'dataset', 'kvs', 'rq']) -> list[ParameterSet]: + """Build the pytest parameter set for the given client category. + + Each parameter carries the resource-client class name; the test body instantiates + the real client against the `httpserver` URL and looks up the factory in + `_CLIENT_FACTORIES`. + """ + client_names = _CLIENT_SET_NAMES[client_set] + return [ + pytest.param(test_case.inputs, test_case.expected_items, client_name, id=f'{client_name}:{test_case.id}') + for test_case in TEST_CASES + for client_name in client_names + if client_name in test_case.supported_clients + ] + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='collection'), +) +def test_client_list_iterable( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """Every sync collection client's `list()` return value should iterate across pages.""" + client: CollectionClient = _CLIENT_FACTORIES[client_name](_make_sync_client(pagination_server)) + returned_items = list(client.iterate(**inputs)) + assert len(returned_items) == len(expected_items) + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='collection'), +) +async def test_client_list_iterable_async( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """Every async collection client's `list()` return value should iterate across pages.""" + client: CollectionClientAsync = _CLIENT_FACTORIES[client_name](_make_async_client(pagination_server)) + returned_items = [item async for item in client.iterate(**inputs)] + assert len(returned_items) == len(expected_items) + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='dataset'), +) +def test_dataset_items_list_iterable( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """The sync dataset client's `iterate_items()` should iterate across pages.""" + client: DatasetClient = _CLIENT_FACTORIES[client_name](_make_sync_client(pagination_server)) + returned_items = list(client.iterate_items(**inputs)) + + if inputs == {}: + list_response = client.list_items(**inputs) + assert len(returned_items) == list_response.total + + assert len(returned_items) == len(expected_items) + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='dataset'), +) +async def test_dataset_items_list_iterable_async( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """The async dataset client's `iterate_items()` should iterate across pages.""" + client: DatasetClientAsync = _CLIENT_FACTORIES[client_name](_make_async_client(pagination_server)) + returned_items = [item async for item in client.iterate_items(**inputs)] + + if inputs == {}: + list_response = await client.list_items(**inputs) + assert len(returned_items) == list_response.total + + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='kvs'), +) +def test_kvs_list_keys_iterable( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """The sync KVS client's `iterate_keys()` should iterate across cursor-paginated pages.""" + client: KeyValueStoreClient = _CLIENT_FACTORIES[client_name](_make_sync_client(pagination_server)) + returned_items = [dict(item) for item in client.iterate_keys(**inputs)] + + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='kvs'), +) +async def test_kvs_list_keys_iterable_async( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """The async KVS client's `iterate_keys()` should iterate across cursor-paginated pages.""" + client: KeyValueStoreClientAsync = _CLIENT_FACTORIES[client_name](_make_async_client(pagination_server)) + returned_items = [dict(item) async for item in client.iterate_keys(**inputs)] + + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='rq'), +) +def test_rq_list_requests_iterable( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """The sync RQ client's `iterate_requests()` should iterate across cursor-paginated pages.""" + client: RequestQueueClient = _CLIENT_FACTORIES[client_name](_make_sync_client(pagination_server)) + returned_items = [dict(item) for item in client.iterate_requests(**inputs)] + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client_name'), + _generate_test_params(client_set='rq'), +) +async def test_rq_list_requests_iterable_async( + pagination_server: HTTPServer, + client_name: str, + inputs: dict, + expected_items: list[dict[str, int]], +) -> None: + """The async RQ client's `iterate_requests()` should iterate across cursor-paginated pages.""" + client: RequestQueueClientAsync = _CLIENT_FACTORIES[client_name](_make_async_client(pagination_server)) + returned_items = [dict(item) async for item in client.iterate_requests(**inputs)] + assert returned_items == expected_items + + +def test_rq_list_requests_rejects_cursor_and_exclusive_start_id() -> None: + """Passing both `cursor` and `exclusive_start_id` is mutually exclusive and must error.""" + client = ApifyClient(token='').request_queue(ID_PLACEHOLDER) + with pytest.raises(ValueError, match='Cannot use both'): + client.list_requests(cursor='a', exclusive_start_id='b') + + +async def test_rq_list_requests_rejects_cursor_and_exclusive_start_id_async() -> None: + """Async variant of the mutual-exclusion check.""" + client = ApifyClientAsync(token='').request_queue(ID_PLACEHOLDER) + with pytest.raises(ValueError, match='Cannot use both'): + await client.list_requests(cursor='a', exclusive_start_id='b')