From eacb838ff8444a5ea0a5c74bbdc2813af1503fb1 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 28 May 2025 14:58:47 -0400 Subject: [PATCH 1/4] feat: Remove chunk_size parameter --- src/obspec/_get.py | 88 ++++++++++++++------------------------------- src/obspec/_head.py | 2 +- src/obspec/_list.py | 20 ++++------- src/obspec/_put.py | 46 +++++++++--------------- 4 files changed, 50 insertions(+), 106 deletions(-) diff --git a/src/obspec/_get.py b/src/obspec/_get.py index 083fa6f..a0857de 100644 --- a/src/obspec/_get.py +++ b/src/obspec/_get.py @@ -92,7 +92,7 @@ class GetOptions(TypedDict, total=False): """ Request transfer of only the specified range of bytes. - The semantics of this tuple are: + The semantics of this attribute are: - `(int, int)`: Request a specific range of bytes `(start, end)`. @@ -130,18 +130,18 @@ class GetOptions(TypedDict, total=False): class GetResult(Protocol): """Result for a get request. - You can materialize the entire buffer by using `bytes`, or you can stream the result - using `stream`. `__iter__` is implemented as an alias to `stream`, so you can - alternatively call `iter()` on `GetResult` to start an iterator. + You can materialize the entire buffer by calling the `bytes` method or you can + stream the result by iterating over it . **Example:** ```py - resp = obs.get(store, path) - # 20MB chunk size in stream - stream = resp.stream(min_chunk_size=20 * 1024 * 1024) - for buf in stream: - print(len(buf)) + from obspec import Get + + def streaming_download(client: Get, path: str): + resp = client.get(path) + for buffer in resp: + print(len(buffer)) ``` """ @@ -171,43 +171,26 @@ def range(self) -> tuple[int, int]: """ ... - def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BufferIterator: - r"""Return a chunked stream over the result's bytes. - - Args: - min_chunk_size: The minimum size in bytes for each chunk in the returned - `BufferStream`. All chunks except for the last chunk will be at least - this size. Defaults to 10\*1024\*1024 (10MB). - - Returns: - A chunked stream - - """ - ... - def __iter__(self) -> BufferStream: - """Return a chunked stream over the result's bytes. - - Uses the default (10MB) chunk size. - """ + """Return a chunked stream over the result's bytes.""" ... class GetResultAsync(Protocol): """Result for an async get request. - You can materialize the entire buffer by using `bytes_async`, or you can stream the - result using `stream`. `__aiter__` is implemented as an alias to `stream`, so you - can alternatively call `aiter()` on `GetResult` to start an iterator. + You can materialize the entire buffer by calling the `bytes_async` method or you can + stream the result by asynchronously iterating over it. **Example:** ```py - resp = await obs.get_async(store, path) - # 5MB chunk size in stream - stream = resp.stream(min_chunk_size=5 * 1024 * 1024) - async for buf in stream: - print(len(buf)) + from obspec import GetAsync + + async def streaming_download(obs: GetAsync, path: str): + resp = await client.get_async(path) + async for buffer in resp: + print(len(buffer)) ``` """ @@ -238,20 +221,6 @@ def range(self) -> tuple[int, int]: """ ... - def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BufferStream: - r"""Return a chunked stream over the result's bytes. - - Args: - min_chunk_size: The minimum size in bytes for each chunk in the returned - `BufferStream`. All chunks except for the last chunk will be at least - this size. Defaults to 10\*1024\*1024 (10MB). - - Returns: - A chunked stream - - """ - ... - def __aiter__(self) -> BufferStream: """Return a chunked stream over the result's bytes. @@ -294,7 +263,7 @@ def get( """Return the bytes that are stored at the specified location. Args: - path: The path within ObjectStore to retrieve. + path: The path within the store to retrieve. options: options for accessing the file. Defaults to None. Returns: @@ -335,7 +304,7 @@ def get_range( exact requested range will be returned. Args: - path: The path within ObjectStore to retrieve. + path: The path within the store to retrieve. Keyword Args: start: The start of the byte range. @@ -345,8 +314,7 @@ def get_range( be non-None. Returns: - A `Buffer` object implementing the Python buffer protocol, allowing - zero-copy access to the underlying memory provided by Rust. + A `Buffer` object implementing the Python buffer protocol. """ ... @@ -379,14 +347,11 @@ def get_ranges( ) -> Sequence[Buffer]: """Return the bytes stored at the specified location in the given byte ranges. - To improve performance this will: - - - Transparently combine ranges less than 1MB apart into a single underlying - request - - Make multiple `fetch` requests in parallel (up to maximum of 10) + The choice of how to implement multiple range requests is implementation + specific. Args: - path: The path within ObjectStore to retrieve. + path: The path within the store to retrieve. Other Args: starts: A sequence of `int` where each offset starts. @@ -396,9 +361,8 @@ def get_ranges( Either `ends` or `lengths` must be non-None. Returns: - A sequence of `Buffer`, one for each range. This `Buffer` object implements - the Python buffer protocol, allowing zero-copy access to the underlying - memory provided by Rust. + A sequence of `Buffer`, one for each range, each implementing the Python + buffer protocol. """ ... diff --git a/src/obspec/_head.py b/src/obspec/_head.py index bf527fb..70bf113 100644 --- a/src/obspec/_head.py +++ b/src/obspec/_head.py @@ -11,7 +11,7 @@ def head(self, path: str) -> ObjectMeta: """Return the metadata for the specified location. Args: - path: The path within ObjectStore to retrieve. + path: The path within the store to retrieve. Returns: ObjectMeta diff --git a/src/obspec/_list.py b/src/obspec/_list.py index 75332b8..0fa59d5 100644 --- a/src/obspec/_list.py +++ b/src/obspec/_list.py @@ -72,7 +72,6 @@ def list( prefix: str | None = None, *, offset: str | None = None, - chunk_size: int = 50, ) -> ListIterator[Sequence[ObjectMeta]]: """List all the objects with the given prefix. @@ -92,7 +91,7 @@ def upload_files(client: obspec.Put): client.put(f"file{i}.txt", b"foo") def list_files(client: obspec.List): - stream = client.list(chunk_size=10) + stream = client.list() for list_result in stream: print(list_result[0]) # {'path': 'file0.txt', 'last_modified': datetime.datetime(2024, 10, 23, 19, 19, 28, 781723, tzinfo=datetime.timezone.utc), 'size': 3, 'e_tag': '0', 'version': None} @@ -104,17 +103,14 @@ def list_files(client: obspec.List): guaranteed Args: - prefix: The prefix within ObjectStore to use for listing. Defaults to None. + prefix: The prefix within the store to use for listing. Defaults to None. Keyword Args: offset: If provided, list all the objects with the given prefix and a location greater than `offset`. Defaults to `None`. - chunk_size: The number of items to collect per chunk in the returned - (async) iterator. All chunks except for the last one will have this many - items. Returns: - A ListStream, which you can iterate through to access list results. + A ListIterator, which you can iterate through to access list results. """ # noqa: E501 ... @@ -126,7 +122,6 @@ def list_async( prefix: str | None = None, *, offset: str | None = None, - chunk_size: int = 50, ) -> ListStream[Sequence[ObjectMeta]]: """List all the objects with the given prefix. @@ -140,7 +135,7 @@ def list_async( Asynchronously iterate through list results. Just change `for` to `async for`: ```py - stream = obs.list_async(store, chunk_size=10) + stream = obs.list_async(store) async for list_result in stream: print(list_result[2]) # {'path': 'file10.txt', 'last_modified': datetime.datetime(2024, 10, 23, 19, 21, 46, 224725, tzinfo=datetime.timezone.utc), 'size': 3, 'e_tag': '10', 'version': None} @@ -152,14 +147,11 @@ def list_async( guaranteed Args: - prefix: The prefix within ObjectStore to use for listing. Defaults to None. + prefix: The prefix within the store to use for listing. Defaults to None. Keyword Args: offset: If provided, list all the objects with the given prefix and a location greater than `offset`. Defaults to `None`. - chunk_size: The number of items to collect per chunk in the returned - (async) iterator. All chunks except for the last one will have this many - items. Returns: A ListStream, which you can iterate through to access list results. @@ -189,7 +181,7 @@ def list_with_delimiter( the paths in the result. Args: - prefix: The prefix within ObjectStore to use for listing. Defaults to None. + prefix: The prefix within the store to use for listing. Defaults to None. Returns: ListResult diff --git a/src/obspec/_put.py b/src/obspec/_put.py index 1dcf3c1..3591cdb 100644 --- a/src/obspec/_put.py +++ b/src/obspec/_put.py @@ -84,7 +84,7 @@ def put( # noqa: PLR0913 mode: PutMode | None = None, use_multipart: bool | None = None, chunk_size: int = ..., - max_concurrency: int = 12, + max_concurrency: int = ..., ) -> PutResult: """Save the provided bytes to the specified location. @@ -92,26 +92,8 @@ def put( # noqa: PLR0913 entirety of `file` to `location`, or fail. No clients should be able to observe a partially written object. - !!! warning "Aborted multipart uploads" - This function will automatically use [multipart - uploads](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) - under the hood for large file objects (whenever the length of the file is - greater than `chunk_size`) or for iterable or async iterable input. - - Multipart uploads have a variety of advantages, including performance and - reliability. - - However, aborted or incomplete multipart uploads can leave partial content - in a hidden state in your bucket, silently adding to your storage costs. - It's recommended to configure lifecycle rules to automatically delete - aborted multipart uploads. See - [here](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html) - for the AWS S3 documentation, for example. - - You can turn off multipart uploads by passing `use_multipart=False`. - Args: - path: The path within ObjectStore for where to save the file. + path: The path within the store for where to save the file. file: The object to upload. Supports various input: - A file-like object opened in binary read mode @@ -131,14 +113,21 @@ def put( # noqa: PLR0913 be performed. Defaults to `"overwrite"`. attributes: Provide a set of `Attributes`. Defaults to `None`. tags: Provide tags for this object. Defaults to `None`. - use_multipart: Whether to use a multipart upload under the hood. Defaults - using a multipart upload if the length of the file is greater than - `chunk_size`. When `use_multipart` is `False`, the entire input will be - materialized in memory as part of the upload. + use_multipart: Whether to force using a multipart upload. + + If `True`, the upload will always use a multipart upload, even if the + length of the file is less than `chunk_size`. If `False`, the upload + will never use a multipart upload, and the entire input will be + materialized in memory as part of the upload. If `None`, the + implementation will choose whether to use a multipart upload based on + the length of the file and `chunk_size`. + + Defaults to `None`. chunk_size: The size of chunks to use within each part of the multipart - upload. Defaults to 5 MB. - max_concurrency: The maximum number of chunks to upload concurrently. - Defaults to 12. + upload. The default is allowed to be implementation-specific. + max_concurrency: The maximum number of chunks to upload concurrently. This + impacts the memory usage of large file uploads. The default is allowed + to be implementation-specific. """ ... @@ -162,7 +151,7 @@ async def put_async( # noqa: PLR0913 mode: PutMode | None = None, use_multipart: bool | None = None, chunk_size: int = ..., - max_concurrency: int = 12, + max_concurrency: int = ..., ) -> PutResult: """Call `put` asynchronously. @@ -177,7 +166,6 @@ async def put_async( # noqa: PLR0913 ```py from obspec import GetAsync, PutAsync - async def streaming_copy( fetch_client: GetAsync, put_client: PutAsync, From a42135607a6ee7d35978515064786d540505a577 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 28 May 2025 15:00:44 -0400 Subject: [PATCH 2/4] fix test --- tests/test-get.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test-get.yml b/tests/test-get.yml index ab14905..3e9abb4 100644 --- a/tests/test-get.yml +++ b/tests/test-get.yml @@ -16,6 +16,5 @@ def accepts_get(client: Get) -> None: resp = client.get("path/to/file") assert_type(resp.range, tuple[int, int]) - buffer_iterator = resp.stream() - for chunk in buffer_iterator: + for chunk in resp: assert_type(chunk, Buffer) From 1a7f3f915e24c2604d9fb72de437e7df4f3ab80a Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 28 May 2025 15:03:25 -0400 Subject: [PATCH 3/4] Fix GetResult to return BufferIterator --- src/obspec/_get.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/obspec/_get.py b/src/obspec/_get.py index a0857de..ad36862 100644 --- a/src/obspec/_get.py +++ b/src/obspec/_get.py @@ -171,7 +171,7 @@ def range(self) -> tuple[int, int]: """ ... - def __iter__(self) -> BufferStream: + def __iter__(self) -> BufferIterator: """Return a chunked stream over the result's bytes.""" ... From ed46e331704360f1bf44a2804dbf67afaff85670 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 28 May 2025 15:05:53 -0400 Subject: [PATCH 4/4] docstring updates --- src/obspec/_list.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/obspec/_list.py b/src/obspec/_list.py index 0fa59d5..f59fc0e 100644 --- a/src/obspec/_list.py +++ b/src/obspec/_list.py @@ -43,14 +43,14 @@ class ListResult(TypedDict, Generic[ListChunkType_co]): class ListIterator(Protocol[ListChunkType_co]): - """A stream of [ObjectMeta][obspec.ObjectMeta] that can be polled synchronously.""" + """An iterator of [ObjectMeta][obspec.ObjectMeta] that can be polled synchronously.""" # noqa: E501 def __iter__(self) -> Self: - """Return `Self` as an async iterator.""" + """Return `Self` as an iterator.""" ... def __next__(self) -> ListChunkType_co: - """Return the next chunk of ObjectMeta in the stream.""" + """Return the next chunk of ObjectMeta in the iterator.""" ...