Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions changes/3612.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Added ``copy_to`` convenience methods to ``zarr.Group`` and ``zarr.Array`` (and their async
counterparts) for copying a node, or a whole group hierarchy, to a destination store that may be
of a different type than the source. The copy is a raw byte-level transfer of the metadata
documents and stored chunks/shards (chunk bytes are not decoded and re-encoded), and reproduces
the source keys exactly, including consolidated metadata. The new ``write_order`` argument (and
the ``copy.write_order`` configuration value) controls whether data or metadata is written first,
so an interrupted copy can be made to never expose an index that points at data which is not yet
present.

The copy streams the source store rather than materializing the chunk grid, so it works on arrays
with arbitrarily many chunks in bounded memory. This is supported by a new
``ChunkKeyEncoding.is_chunk_key`` method (and ``ArrayV2Metadata.is_chunk_key`` /
``ArrayV3Metadata.is_chunk_key``) that recognizes whether a store key is a valid chunk/shard key
for an array in O(1), without enumerating the grid.
23 changes: 23 additions & 0 deletions docs/user-guide/groups.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,26 @@ Groups also have the [`zarr.Group.tree`][] method, e.g.:
print(root.tree())
```

!!! note
[`zarr.Group.tree`][] requires the optional [rich](https://rich.readthedocs.io/en/stable/) dependency. It can be installed with the `[tree]` extra.

You can copy a Group (including consolidated metadata) to a new destination store
(the type of store can differ from the source store) using the `copy_to` method:

```python exec="true" session="groups" source="above" result="ansi"
destination_store = zarr.storage.MemoryStore()
new_group = root.copy_to(destination_store, overwrite=True)
```

This is a raw byte-level copy of the metadata documents and stored chunks/shards:
the chunk bytes are transferred as-is, without being decoded and re-encoded.
[`zarr.Array.copy_to`][] is available too, for copying a single array.

The `write_order` argument controls what a partially completed (e.g. interrupted)
copy looks like to a reader. With the default `"data_first"`, all chunk data is
written before the metadata, and the root group's metadata (the hierarchy's index)
is written last, so an interrupted copy never exposes an index that points at data
which is not yet present. With `"metadata_first"`, the metadata is written first, so
the hierarchy is browsable while the copy is in progress (chunks that have not been
copied yet read as the array's fill value). The default can be set globally with the
`copy.write_order` [runtime configuration](config.md) value.
286 changes: 282 additions & 4 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import math
import warnings
from asyncio import gather
from collections.abc import Iterable, Mapping, Sequence
from asyncio import FIRST_COMPLETED, Task, ensure_future, gather, wait
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Mapping, Sequence
from dataclasses import dataclass, field, replace
from itertools import starmap
from logging import getLogger
Expand Down Expand Up @@ -60,6 +60,8 @@
ZARR_JSON,
ZARRAY_JSON,
ZATTRS_JSON,
ZGROUP_JSON,
ZMETADATA_V2_JSON,
ChunksLike,
DimensionNamesLike,
MemoryOrder,
Expand Down Expand Up @@ -156,6 +158,7 @@
from zarr.abc.store import Store
from zarr.codecs.sharding import IndexLocation
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar
from zarr.core.group import GroupMetadata
from zarr.storage import StoreLike
from zarr.types import AnyArray, AnyAsyncArray, ArrayV2, ArrayV3, AsyncArrayV2, AsyncArrayV3

Expand Down Expand Up @@ -1793,6 +1796,68 @@ def _info(
_count_chunks_initialized=count_chunks_initialized,
)

async def copy_to(
self,
store: StoreLike,
*,
path: str | None = None,
overwrite: bool = False,
write_order: Literal["data_first", "metadata_first"] | None = None,
) -> AnyAsyncArray:
"""
Copy this array to a new store.

This performs a raw byte-level copy of the array's metadata and stored chunks/shards,
without decoding or re-encoding the data. Only the data keys defined by the array's
metadata that actually exist in the store are copied, so foreign keys under the array's
prefix and unwritten (sparse) chunks are skipped. Objects are transferred concurrently,
governed by the ``async.concurrency`` configuration value.

Parameters
----------
store : StoreLike
The store to copy to.
path : str, optional
Array path within the destination store.
overwrite : bool, optional
If True, overwrite any existing data in the target store. Default is False.
write_order : {"data_first", "metadata_first"}, optional
The order in which to write objects to the destination, controlling what a partially
completed (e.g. interrupted) copy looks like to a reader. ``"data_first"`` writes the
chunk/shard data before the array metadata, so the array only becomes openable once its
data is present. ``"metadata_first"`` writes the metadata before the data, so the array
is openable while the copy is in progress (missing chunks read as ``fill_value``). If
None (default), the ``copy.write_order`` configuration value is used.

Returns
-------
AsyncArray
The new array in the target store.
"""
resolved_order = _resolve_write_order(write_order)
target_store_path = await make_store_path(store, path=path or "")

if overwrite:
await target_store_path.store.delete_dir(target_store_path.path)
else:
await ensure_no_existing_node(target_store_path, zarr_format=self.metadata.zarr_format)

src_prefix = self.store_path.path + "/" if self.store_path.path else ""
dst_prefix = target_store_path.path + "/" if target_store_path.path else ""
copy_one = _make_key_copier(
self.store_path.store, target_store_path.store, default_buffer_prototype()
)

await _execute_copy(
write_order=resolved_order,
copy_one=copy_one,
limit=zarr_config.get("async.concurrency"),
data_pairs=lambda: _array_data_pairs(self, src_prefix, dst_prefix),
child_metadata_pairs=[],
root_metadata_pairs=list(_node_metadata_pairs(self.metadata, src_prefix, dst_prefix)),
)
return await type(self).open(target_store_path, zarr_format=self.metadata.zarr_format)


# TODO: Array can be a frozen data class again once property setters (e.g. shape) are removed
@dataclass(frozen=False)
Expand Down Expand Up @@ -1941,6 +2006,50 @@ def open(
async_array = sync(AsyncArray.open(store))
return cls(async_array)

def copy_to(
self,
store: StoreLike,
*,
path: str | None = None,
overwrite: bool = False,
write_order: Literal["data_first", "metadata_first"] | None = None,
) -> Array[T_ArrayMetadata]:
"""
Copy this array to a new store.

This performs a raw byte-level copy of the array's metadata and stored chunks/shards,
without decoding or re-encoding the data. Only the data keys defined by the array's
metadata that actually exist in the store are copied, so foreign keys under the array's
prefix and unwritten (sparse) chunks are skipped.

Parameters
----------
store : StoreLike
The store to copy to.
path : str, optional
Array path within the destination store.
overwrite : bool, optional
If True, overwrite any existing data in the target store. Default is False.
write_order : {"data_first", "metadata_first"}, optional
The order in which to write objects to the destination. ``"data_first"`` (the default
behaviour when None, via the ``copy.write_order`` config value) writes chunk/shard data
before the array metadata, so the array only becomes openable once its data is present.
``"metadata_first"`` writes the metadata first, so the array is openable mid-copy with
missing chunks reading as ``fill_value``.

Returns
-------
Array
The new array in the target store.
"""
return type(self)(
sync(
self._async_array.copy_to(
store=store, path=path, overwrite=overwrite, write_order=write_order
)
)
)

@property
def store(self) -> Store:
return self.async_array.store
Expand Down Expand Up @@ -3975,9 +4084,9 @@ async def _shards_initialized(
store_contents = [
x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path)
]
store_contents_relative = [
store_contents_relative = {
_relativize_path(path=key, prefix=array.store_path.path) for key in store_contents
]
}
return tuple(
chunk_key for chunk_key in array._iter_shard_keys() if chunk_key in store_contents_relative
)
Expand Down Expand Up @@ -5193,6 +5302,175 @@ def _iter_shard_keys(
return (array.metadata.encode_chunk_key(k) for k in _iter)


def _iter_metadata_keys(
metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata,
) -> Iterator[str]:
"""
Iterate over the storage keys of the metadata documents for a node, relative to the node's path.

These keys are defined entirely by the node's zarr format and type. This is the closed set of
metadata keys a node may own; some keys (e.g. v2 consolidated metadata) are only present in the
store under certain conditions, so callers that copy or read these keys should tolerate absent
ones rather than assuming all are present.

Parameters
----------
metadata : ArrayV2Metadata | ArrayV3Metadata | GroupMetadata
The metadata of the node to enumerate keys for.

Yields
------
key: str
The relative storage key of each metadata document.
"""
from zarr.core.group import GroupMetadata

if metadata.zarr_format == 3:
yield ZARR_JSON
elif isinstance(metadata, GroupMetadata):
yield ZGROUP_JSON
yield ZATTRS_JSON
# Consolidated metadata is only written when the group is consolidated; it may be absent.
yield ZMETADATA_V2_JSON
else:
yield ZARRAY_JSON
yield ZATTRS_JSON


def _resolve_write_order(
write_order: Literal["data_first", "metadata_first"] | None,
) -> Literal["data_first", "metadata_first"]:
"""Resolve the copy write-order, falling back to the ``copy.write_order`` config value."""
order = write_order if write_order is not None else zarr_config.get("copy.write_order")
if order not in ("data_first", "metadata_first"):
raise ValueError(
f"Invalid write_order {order!r}; expected 'data_first' or 'metadata_first'."
)
return cast("Literal['data_first', 'metadata_first']", order)


def _make_key_copier(
src_store: Store, dst_store: Store, prototype: BufferPrototype
) -> Callable[[str, str], Awaitable[None]]:
"""Build a coroutine that copies the bytes at one key from ``src_store`` to ``dst_store``."""

async def copy_one(src_key: str, dst_key: str) -> None:
data = await src_store.get(src_key, prototype=prototype)
if data is not None:
await dst_store.set(dst_key, data)

return copy_one


def _node_metadata_pairs(
metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata,
src_node_prefix: str,
dst_node_prefix: str,
) -> Iterator[tuple[str, str]]:
"""Yield ``(source_key, destination_key)`` pairs for a node's metadata documents."""
for relative_key in _iter_metadata_keys(metadata):
yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}")


async def _array_data_pairs(
array: AnyAsyncArray, src_node_prefix: str, dst_node_prefix: str
) -> AsyncIterator[tuple[str, str]]:
"""
Yield ``(source_key, destination_key)`` pairs for an array's stored chunks/shards.

Only the data keys the array's metadata defines that actually exist in the store are produced
(foreign keys and unwritten chunks are skipped).

When the array's chunk key encoding can decode keys (the built-in encodings can), the source
store is *streamed*: keys are listed lazily and each is recognized in O(1) via
``metadata.is_chunk_key``, so the chunk grid is never materialized. This keeps memory bounded
when copying arrays with arbitrarily many chunks. Custom encodings that cannot decode keys fall
back to enumerating the grid via [`_shards_initialized`][zarr.core.array._shards_initialized].
"""
metadata = array.metadata
if metadata.chunk_key_encoding.supports_decode():
store = array.store_path.store
array_path = array.store_path.path
child_prefix = f"{array_path}/" if array_path else ""
async for stored_key in store.list_prefix(array_path):
# list_prefix matches on string prefix, so guard against sibling nodes whose path
# starts with this array's path (e.g. "foo" vs "foobar").
if child_prefix and not stored_key.startswith(child_prefix):
continue
relative_key = stored_key[len(child_prefix) :]
if metadata.is_chunk_key(relative_key):
yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}")
else:
for relative_key in await _shards_initialized(array):
yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}")


async def _aiter_pairs(pairs: Iterable[tuple[str, str]]) -> AsyncIterator[tuple[str, str]]:
"""Adapt a synchronous iterable of key pairs to an async iterator."""
for pair in pairs:
yield pair


async def _copy_keys(
pairs: AsyncIterator[tuple[str, str]],
copy_one: Callable[[str, str], Awaitable[None]],
limit: int,
) -> None:
"""
Copy ``(source_key, destination_key)`` pairs with at most ``limit`` transfers in flight.

Pairs are consumed lazily and at most ``limit`` copy tasks exist at any moment, so neither the
full key set nor one task per key is ever resident. This bounds memory when copying hierarchies
with very large numbers of objects, unlike materializing every key up front.
"""
pending: set[Task[None]] = set()
try:
async for src_key, dst_key in pairs:
if len(pending) >= limit:
done, pending = await wait(pending, return_when=FIRST_COMPLETED)
for task in done:
task.result() # propagate any exception
pending.add(ensure_future(copy_one(src_key, dst_key)))
while pending:
done, pending = await wait(pending, return_when=FIRST_COMPLETED)
for task in done:
task.result()
finally:
for task in pending:
task.cancel()
if pending:
await gather(*pending, return_exceptions=True)


async def _execute_copy(
*,
write_order: Literal["data_first", "metadata_first"],
copy_one: Callable[[str, str], Awaitable[None]],
limit: int,
data_pairs: Callable[[], AsyncIterator[tuple[str, str]]],
child_metadata_pairs: list[tuple[str, str]],
root_metadata_pairs: list[tuple[str, str]],
) -> None:
"""
Run a node/hierarchy copy in two ordered phases.

For ``"data_first"`` all chunk/shard data is copied, then descendant metadata, then the root
node's metadata last of all -- so an interrupted copy never exposes an index (group metadata,
including consolidated metadata) that points at data which is not yet present. For
``"metadata_first"`` the metadata is written before the data.
"""
if write_order == "data_first":
await _copy_keys(data_pairs(), copy_one, limit)
await _copy_keys(_aiter_pairs(child_metadata_pairs), copy_one, limit)
# The root metadata is the hierarchy's index; write it dead last.
await _copy_keys(_aiter_pairs(root_metadata_pairs), copy_one, limit)
else:
await _copy_keys(
_aiter_pairs([*child_metadata_pairs, *root_metadata_pairs]), copy_one, limit
)
await _copy_keys(data_pairs(), copy_one, limit)


def _iter_shard_regions(
array: AnyArray | AnyAsyncArray,
*,
Expand Down
Loading