Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 27 additions & 63 deletions src/obspec/_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class GetOptions(TypedDict, total=False):
"""
Request transfer of only the specified range of bytes.

The semantics of this tuple are:
The semantics of this attribute are:

- `(int, int)`: Request a specific range of bytes `(start, end)`.

Expand Down Expand Up @@ -130,18 +130,18 @@ class GetOptions(TypedDict, total=False):
class GetResult(Protocol):
"""Result for a get request.

You can materialize the entire buffer by using `bytes`, or you can stream the result
using `stream`. `__iter__` is implemented as an alias to `stream`, so you can
alternatively call `iter()` on `GetResult` to start an iterator.
You can materialize the entire buffer by calling the `bytes` method or you can
stream the result by iterating over it .

**Example:**

```py
resp = obs.get(store, path)
# 20MB chunk size in stream
stream = resp.stream(min_chunk_size=20 * 1024 * 1024)
for buf in stream:
print(len(buf))
from obspec import Get

def streaming_download(client: Get, path: str):
resp = client.get(path)
for buffer in resp:
print(len(buffer))
```
"""

Expand Down Expand Up @@ -171,43 +171,26 @@ def range(self) -> tuple[int, int]:
"""
...

def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BufferIterator:
r"""Return a chunked stream over the result's bytes.

Args:
min_chunk_size: The minimum size in bytes for each chunk in the returned
`BufferStream`. All chunks except for the last chunk will be at least
this size. Defaults to 10\*1024\*1024 (10MB).

Returns:
A chunked stream

"""
...

def __iter__(self) -> BufferStream:
"""Return a chunked stream over the result's bytes.

Uses the default (10MB) chunk size.
"""
def __iter__(self) -> BufferIterator:
"""Return a chunked stream over the result's bytes."""
...


class GetResultAsync(Protocol):
"""Result for an async get request.

You can materialize the entire buffer by using `bytes_async`, or you can stream the
result using `stream`. `__aiter__` is implemented as an alias to `stream`, so you
can alternatively call `aiter()` on `GetResult` to start an iterator.
You can materialize the entire buffer by calling the `bytes_async` method or you can
stream the result by asynchronously iterating over it.

**Example:**

```py
resp = await obs.get_async(store, path)
# 5MB chunk size in stream
stream = resp.stream(min_chunk_size=5 * 1024 * 1024)
async for buf in stream:
print(len(buf))
from obspec import GetAsync

async def streaming_download(obs: GetAsync, path: str):
resp = await client.get_async(path)
async for buffer in resp:
print(len(buffer))
```
"""

Expand Down Expand Up @@ -238,20 +221,6 @@ def range(self) -> tuple[int, int]:
"""
...

def stream(self, min_chunk_size: int = 10 * 1024 * 1024) -> BufferStream:
r"""Return a chunked stream over the result's bytes.

Args:
min_chunk_size: The minimum size in bytes for each chunk in the returned
`BufferStream`. All chunks except for the last chunk will be at least
this size. Defaults to 10\*1024\*1024 (10MB).

Returns:
A chunked stream

"""
...

def __aiter__(self) -> BufferStream:
"""Return a chunked stream over the result's bytes.

Expand Down Expand Up @@ -294,7 +263,7 @@ def get(
"""Return the bytes that are stored at the specified location.

Args:
path: The path within ObjectStore to retrieve.
path: The path within the store to retrieve.
options: options for accessing the file. Defaults to None.

Returns:
Expand Down Expand Up @@ -335,7 +304,7 @@ def get_range(
exact requested range will be returned.

Args:
path: The path within ObjectStore to retrieve.
path: The path within the store to retrieve.

Keyword Args:
start: The start of the byte range.
Expand All @@ -345,8 +314,7 @@ def get_range(
be non-None.

Returns:
A `Buffer` object implementing the Python buffer protocol, allowing
zero-copy access to the underlying memory provided by Rust.
A `Buffer` object implementing the Python buffer protocol.

"""
...
Expand Down Expand Up @@ -379,14 +347,11 @@ def get_ranges(
) -> Sequence[Buffer]:
"""Return the bytes stored at the specified location in the given byte ranges.

To improve performance this will:

- Transparently combine ranges less than 1MB apart into a single underlying
request
- Make multiple `fetch` requests in parallel (up to maximum of 10)
The choice of how to implement multiple range requests is implementation
specific.

Args:
path: The path within ObjectStore to retrieve.
path: The path within the store to retrieve.

Other Args:
starts: A sequence of `int` where each offset starts.
Expand All @@ -396,9 +361,8 @@ def get_ranges(
Either `ends` or `lengths` must be non-None.

Returns:
A sequence of `Buffer`, one for each range. This `Buffer` object implements
the Python buffer protocol, allowing zero-copy access to the underlying
memory provided by Rust.
A sequence of `Buffer`, one for each range, each implementing the Python
buffer protocol.

"""
...
Expand Down
2 changes: 1 addition & 1 deletion src/obspec/_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def head(self, path: str) -> ObjectMeta:
"""Return the metadata for the specified location.

Args:
path: The path within ObjectStore to retrieve.
path: The path within the store to retrieve.

Returns:
ObjectMeta
Expand Down
26 changes: 9 additions & 17 deletions src/obspec/_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ class ListResult(TypedDict, Generic[ListChunkType_co]):


class ListIterator(Protocol[ListChunkType_co]):
"""A stream of [ObjectMeta][obspec.ObjectMeta] that can be polled synchronously."""
"""An iterator of [ObjectMeta][obspec.ObjectMeta] that can be polled synchronously.""" # noqa: E501

def __iter__(self) -> Self:
"""Return `Self` as an async iterator."""
"""Return `Self` as an iterator."""
...

def __next__(self) -> ListChunkType_co:
"""Return the next chunk of ObjectMeta in the stream."""
"""Return the next chunk of ObjectMeta in the iterator."""
...


Expand All @@ -72,7 +72,6 @@ def list(
prefix: str | None = None,
*,
offset: str | None = None,
chunk_size: int = 50,
) -> ListIterator[Sequence[ObjectMeta]]:
"""List all the objects with the given prefix.

Expand All @@ -92,7 +91,7 @@ def upload_files(client: obspec.Put):
client.put(f"file{i}.txt", b"foo")

def list_files(client: obspec.List):
stream = client.list(chunk_size=10)
stream = client.list()
for list_result in stream:
print(list_result[0])
# {'path': 'file0.txt', 'last_modified': datetime.datetime(2024, 10, 23, 19, 19, 28, 781723, tzinfo=datetime.timezone.utc), 'size': 3, 'e_tag': '0', 'version': None}
Expand All @@ -104,17 +103,14 @@ def list_files(client: obspec.List):
guaranteed

Args:
prefix: The prefix within ObjectStore to use for listing. Defaults to None.
prefix: The prefix within the store to use for listing. Defaults to None.

Keyword Args:
offset: If provided, list all the objects with the given prefix and a
location greater than `offset`. Defaults to `None`.
chunk_size: The number of items to collect per chunk in the returned
(async) iterator. All chunks except for the last one will have this many
items.

Returns:
A ListStream, which you can iterate through to access list results.
A ListIterator, which you can iterate through to access list results.

""" # noqa: E501
...
Expand All @@ -126,7 +122,6 @@ def list_async(
prefix: str | None = None,
*,
offset: str | None = None,
chunk_size: int = 50,
) -> ListStream[Sequence[ObjectMeta]]:
"""List all the objects with the given prefix.

Expand All @@ -140,7 +135,7 @@ def list_async(
Asynchronously iterate through list results. Just change `for` to `async for`:

```py
stream = obs.list_async(store, chunk_size=10)
stream = obs.list_async(store)
async for list_result in stream:
print(list_result[2])
# {'path': 'file10.txt', 'last_modified': datetime.datetime(2024, 10, 23, 19, 21, 46, 224725, tzinfo=datetime.timezone.utc), 'size': 3, 'e_tag': '10', 'version': None}
Expand All @@ -152,14 +147,11 @@ def list_async(
guaranteed

Args:
prefix: The prefix within ObjectStore to use for listing. Defaults to None.
prefix: The prefix within the store to use for listing. Defaults to None.

Keyword Args:
offset: If provided, list all the objects with the given prefix and a
location greater than `offset`. Defaults to `None`.
chunk_size: The number of items to collect per chunk in the returned
(async) iterator. All chunks except for the last one will have this many
items.

Returns:
A ListStream, which you can iterate through to access list results.
Expand Down Expand Up @@ -189,7 +181,7 @@ def list_with_delimiter(
the paths in the result.

Args:
prefix: The prefix within ObjectStore to use for listing. Defaults to None.
prefix: The prefix within the store to use for listing. Defaults to None.

Returns:
ListResult
Expand Down
46 changes: 17 additions & 29 deletions src/obspec/_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,34 +84,16 @@ def put( # noqa: PLR0913
mode: PutMode | None = None,
use_multipart: bool | None = None,
chunk_size: int = ...,
max_concurrency: int = 12,
max_concurrency: int = ...,
) -> PutResult:
"""Save the provided bytes to the specified location.

The operation is guaranteed to be atomic, it will either successfully write the
entirety of `file` to `location`, or fail. No clients should be able to observe
a partially written object.

!!! warning "Aborted multipart uploads"
This function will automatically use [multipart
uploads](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html)
under the hood for large file objects (whenever the length of the file is
greater than `chunk_size`) or for iterable or async iterable input.

Multipart uploads have a variety of advantages, including performance and
reliability.

However, aborted or incomplete multipart uploads can leave partial content
in a hidden state in your bucket, silently adding to your storage costs.
It's recommended to configure lifecycle rules to automatically delete
aborted multipart uploads. See
[here](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html)
for the AWS S3 documentation, for example.

You can turn off multipart uploads by passing `use_multipart=False`.

Args:
path: The path within ObjectStore for where to save the file.
path: The path within the store for where to save the file.
file: The object to upload. Supports various input:

- A file-like object opened in binary read mode
Expand All @@ -131,14 +113,21 @@ def put( # noqa: PLR0913
be performed. Defaults to `"overwrite"`.
attributes: Provide a set of `Attributes`. Defaults to `None`.
tags: Provide tags for this object. Defaults to `None`.
use_multipart: Whether to use a multipart upload under the hood. Defaults
using a multipart upload if the length of the file is greater than
`chunk_size`. When `use_multipart` is `False`, the entire input will be
materialized in memory as part of the upload.
use_multipart: Whether to force using a multipart upload.

If `True`, the upload will always use a multipart upload, even if the
length of the file is less than `chunk_size`. If `False`, the upload
will never use a multipart upload, and the entire input will be
materialized in memory as part of the upload. If `None`, the
implementation will choose whether to use a multipart upload based on
the length of the file and `chunk_size`.

Defaults to `None`.
chunk_size: The size of chunks to use within each part of the multipart
upload. Defaults to 5 MB.
max_concurrency: The maximum number of chunks to upload concurrently.
Defaults to 12.
upload. The default is allowed to be implementation-specific.
max_concurrency: The maximum number of chunks to upload concurrently. This
impacts the memory usage of large file uploads. The default is allowed
to be implementation-specific.

"""
...
Expand All @@ -162,7 +151,7 @@ async def put_async( # noqa: PLR0913
mode: PutMode | None = None,
use_multipart: bool | None = None,
chunk_size: int = ...,
max_concurrency: int = 12,
max_concurrency: int = ...,
) -> PutResult:
"""Call `put` asynchronously.

Expand All @@ -177,7 +166,6 @@ async def put_async( # noqa: PLR0913
```py
from obspec import GetAsync, PutAsync


async def streaming_copy(
fetch_client: GetAsync,
put_client: PutAsync,
Expand Down
3 changes: 1 addition & 2 deletions tests/test-get.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@
def accepts_get(client: Get) -> None:
resp = client.get("path/to/file")
assert_type(resp.range, tuple[int, int])
buffer_iterator = resp.stream()
for chunk in buffer_iterator:
for chunk in resp:
assert_type(chunk, Buffer)
Loading