diff --git a/changes/3612.feature.md b/changes/3612.feature.md new file mode 100644 index 0000000000..734d6b7111 --- /dev/null +++ b/changes/3612.feature.md @@ -0,0 +1,14 @@ +Added ``copy_to`` convenience methods to ``zarr.Group`` and ``zarr.Array`` (and their async +counterparts) for copying a node, or a whole group hierarchy, to a destination store that may be +of a different type than the source. The copy is a raw byte-level transfer of the metadata +documents and stored chunks/shards (chunk bytes are not decoded and re-encoded), and reproduces +the source keys exactly, including consolidated metadata. The new ``write_order`` argument (and +the ``copy.write_order`` configuration value) controls whether data or metadata is written first, +so an interrupted copy can be made to never expose an index that points at data which is not yet +present. + +The copy streams the source store rather than materializing the chunk grid, so it works on arrays +with arbitrarily many chunks in bounded memory. This is supported by a new +``ChunkKeyEncoding.is_chunk_key`` method (and ``ArrayV2Metadata.is_chunk_key`` / +``ArrayV3Metadata.is_chunk_key``) that recognizes whether a store key is a valid chunk/shard key +for an array in O(1), without enumerating the grid. diff --git a/docs/user-guide/groups.md b/docs/user-guide/groups.md index 5faa26a281..9d372840cd 100644 --- a/docs/user-guide/groups.md +++ b/docs/user-guide/groups.md @@ -131,3 +131,26 @@ Groups also have the [`zarr.Group.tree`][] method, e.g.: print(root.tree()) ``` +!!! note + [`zarr.Group.tree`][] requires the optional [rich](https://rich.readthedocs.io/en/stable/) dependency. It can be installed with the `[tree]` extra. + +You can copy a Group (including consolidated metadata) to a new destination store +(the type of store can differ from the source store) using the `copy_to` method: + +```python exec="true" session="groups" source="above" result="ansi" +destination_store = zarr.storage.MemoryStore() +new_group = root.copy_to(destination_store, overwrite=True) +``` + +This is a raw byte-level copy of the metadata documents and stored chunks/shards: +the chunk bytes are transferred as-is, without being decoded and re-encoded. +[`zarr.Array.copy_to`][] is available too, for copying a single array. + +The `write_order` argument controls what a partially completed (e.g. interrupted) +copy looks like to a reader. With the default `"data_first"`, all chunk data is +written before the metadata, and the root group's metadata (the hierarchy's index) +is written last, so an interrupted copy never exposes an index that points at data +which is not yet present. With `"metadata_first"`, the metadata is written first, so +the hierarchy is browsable while the copy is in progress (chunks that have not been +copied yet read as the array's fill value). The default can be set globally with the +`copy.write_order` [runtime configuration](config.md) value. diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 977520b12e..60be7e86be 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -2,8 +2,8 @@ import math import warnings -from asyncio import gather -from collections.abc import Iterable, Mapping, Sequence +from asyncio import FIRST_COMPLETED, Task, ensure_future, gather, wait +from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Mapping, Sequence from dataclasses import dataclass, field, replace from itertools import starmap from logging import getLogger @@ -60,6 +60,8 @@ ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, + ZGROUP_JSON, + ZMETADATA_V2_JSON, ChunksLike, DimensionNamesLike, MemoryOrder, @@ -156,6 +158,7 @@ from zarr.abc.store import Store from zarr.codecs.sharding import IndexLocation from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar + from zarr.core.group import GroupMetadata from zarr.storage import StoreLike from zarr.types import AnyArray, AnyAsyncArray, ArrayV2, ArrayV3, AsyncArrayV2, AsyncArrayV3 @@ -1793,6 +1796,68 @@ def _info( _count_chunks_initialized=count_chunks_initialized, ) + async def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + write_order: Literal["data_first", "metadata_first"] | None = None, + ) -> AnyAsyncArray: + """ + Copy this array to a new store. + + This performs a raw byte-level copy of the array's metadata and stored chunks/shards, + without decoding or re-encoding the data. Only the data keys defined by the array's + metadata that actually exist in the store are copied, so foreign keys under the array's + prefix and unwritten (sparse) chunks are skipped. Objects are transferred concurrently, + governed by the ``async.concurrency`` configuration value. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Array path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination, controlling what a partially + completed (e.g. interrupted) copy looks like to a reader. ``"data_first"`` writes the + chunk/shard data before the array metadata, so the array only becomes openable once its + data is present. ``"metadata_first"`` writes the metadata before the data, so the array + is openable while the copy is in progress (missing chunks read as ``fill_value``). If + None (default), the ``copy.write_order`` configuration value is used. + + Returns + ------- + AsyncArray + The new array in the target store. + """ + resolved_order = _resolve_write_order(write_order) + target_store_path = await make_store_path(store, path=path or "") + + if overwrite: + await target_store_path.store.delete_dir(target_store_path.path) + else: + await ensure_no_existing_node(target_store_path, zarr_format=self.metadata.zarr_format) + + src_prefix = self.store_path.path + "/" if self.store_path.path else "" + dst_prefix = target_store_path.path + "/" if target_store_path.path else "" + copy_one = _make_key_copier( + self.store_path.store, target_store_path.store, default_buffer_prototype() + ) + + await _execute_copy( + write_order=resolved_order, + copy_one=copy_one, + limit=zarr_config.get("async.concurrency"), + data_pairs=lambda: _array_data_pairs(self, src_prefix, dst_prefix), + child_metadata_pairs=[], + root_metadata_pairs=list(_node_metadata_pairs(self.metadata, src_prefix, dst_prefix)), + ) + return await type(self).open(target_store_path, zarr_format=self.metadata.zarr_format) + # TODO: Array can be a frozen data class again once property setters (e.g. shape) are removed @dataclass(frozen=False) @@ -1941,6 +2006,50 @@ def open( async_array = sync(AsyncArray.open(store)) return cls(async_array) + def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + write_order: Literal["data_first", "metadata_first"] | None = None, + ) -> Array[T_ArrayMetadata]: + """ + Copy this array to a new store. + + This performs a raw byte-level copy of the array's metadata and stored chunks/shards, + without decoding or re-encoding the data. Only the data keys defined by the array's + metadata that actually exist in the store are copied, so foreign keys under the array's + prefix and unwritten (sparse) chunks are skipped. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Array path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination. ``"data_first"`` (the default + behaviour when None, via the ``copy.write_order`` config value) writes chunk/shard data + before the array metadata, so the array only becomes openable once its data is present. + ``"metadata_first"`` writes the metadata first, so the array is openable mid-copy with + missing chunks reading as ``fill_value``. + + Returns + ------- + Array + The new array in the target store. + """ + return type(self)( + sync( + self._async_array.copy_to( + store=store, path=path, overwrite=overwrite, write_order=write_order + ) + ) + ) + @property def store(self) -> Store: return self.async_array.store @@ -3975,9 +4084,9 @@ async def _shards_initialized( store_contents = [ x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path) ] - store_contents_relative = [ + store_contents_relative = { _relativize_path(path=key, prefix=array.store_path.path) for key in store_contents - ] + } return tuple( chunk_key for chunk_key in array._iter_shard_keys() if chunk_key in store_contents_relative ) @@ -5193,6 +5302,175 @@ def _iter_shard_keys( return (array.metadata.encode_chunk_key(k) for k in _iter) +def _iter_metadata_keys( + metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata, +) -> Iterator[str]: + """ + Iterate over the storage keys of the metadata documents for a node, relative to the node's path. + + These keys are defined entirely by the node's zarr format and type. This is the closed set of + metadata keys a node may own; some keys (e.g. v2 consolidated metadata) are only present in the + store under certain conditions, so callers that copy or read these keys should tolerate absent + ones rather than assuming all are present. + + Parameters + ---------- + metadata : ArrayV2Metadata | ArrayV3Metadata | GroupMetadata + The metadata of the node to enumerate keys for. + + Yields + ------ + key: str + The relative storage key of each metadata document. + """ + from zarr.core.group import GroupMetadata + + if metadata.zarr_format == 3: + yield ZARR_JSON + elif isinstance(metadata, GroupMetadata): + yield ZGROUP_JSON + yield ZATTRS_JSON + # Consolidated metadata is only written when the group is consolidated; it may be absent. + yield ZMETADATA_V2_JSON + else: + yield ZARRAY_JSON + yield ZATTRS_JSON + + +def _resolve_write_order( + write_order: Literal["data_first", "metadata_first"] | None, +) -> Literal["data_first", "metadata_first"]: + """Resolve the copy write-order, falling back to the ``copy.write_order`` config value.""" + order = write_order if write_order is not None else zarr_config.get("copy.write_order") + if order not in ("data_first", "metadata_first"): + raise ValueError( + f"Invalid write_order {order!r}; expected 'data_first' or 'metadata_first'." + ) + return cast("Literal['data_first', 'metadata_first']", order) + + +def _make_key_copier( + src_store: Store, dst_store: Store, prototype: BufferPrototype +) -> Callable[[str, str], Awaitable[None]]: + """Build a coroutine that copies the bytes at one key from ``src_store`` to ``dst_store``.""" + + async def copy_one(src_key: str, dst_key: str) -> None: + data = await src_store.get(src_key, prototype=prototype) + if data is not None: + await dst_store.set(dst_key, data) + + return copy_one + + +def _node_metadata_pairs( + metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata, + src_node_prefix: str, + dst_node_prefix: str, +) -> Iterator[tuple[str, str]]: + """Yield ``(source_key, destination_key)`` pairs for a node's metadata documents.""" + for relative_key in _iter_metadata_keys(metadata): + yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") + + +async def _array_data_pairs( + array: AnyAsyncArray, src_node_prefix: str, dst_node_prefix: str +) -> AsyncIterator[tuple[str, str]]: + """ + Yield ``(source_key, destination_key)`` pairs for an array's stored chunks/shards. + + Only the data keys the array's metadata defines that actually exist in the store are produced + (foreign keys and unwritten chunks are skipped). + + When the array's chunk key encoding can decode keys (the built-in encodings can), the source + store is *streamed*: keys are listed lazily and each is recognized in O(1) via + ``metadata.is_chunk_key``, so the chunk grid is never materialized. This keeps memory bounded + when copying arrays with arbitrarily many chunks. Custom encodings that cannot decode keys fall + back to enumerating the grid via [`_shards_initialized`][zarr.core.array._shards_initialized]. + """ + metadata = array.metadata + if metadata.chunk_key_encoding.supports_decode(): + store = array.store_path.store + array_path = array.store_path.path + child_prefix = f"{array_path}/" if array_path else "" + async for stored_key in store.list_prefix(array_path): + # list_prefix matches on string prefix, so guard against sibling nodes whose path + # starts with this array's path (e.g. "foo" vs "foobar"). + if child_prefix and not stored_key.startswith(child_prefix): + continue + relative_key = stored_key[len(child_prefix) :] + if metadata.is_chunk_key(relative_key): + yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") + else: + for relative_key in await _shards_initialized(array): + yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") + + +async def _aiter_pairs(pairs: Iterable[tuple[str, str]]) -> AsyncIterator[tuple[str, str]]: + """Adapt a synchronous iterable of key pairs to an async iterator.""" + for pair in pairs: + yield pair + + +async def _copy_keys( + pairs: AsyncIterator[tuple[str, str]], + copy_one: Callable[[str, str], Awaitable[None]], + limit: int, +) -> None: + """ + Copy ``(source_key, destination_key)`` pairs with at most ``limit`` transfers in flight. + + Pairs are consumed lazily and at most ``limit`` copy tasks exist at any moment, so neither the + full key set nor one task per key is ever resident. This bounds memory when copying hierarchies + with very large numbers of objects, unlike materializing every key up front. + """ + pending: set[Task[None]] = set() + try: + async for src_key, dst_key in pairs: + if len(pending) >= limit: + done, pending = await wait(pending, return_when=FIRST_COMPLETED) + for task in done: + task.result() # propagate any exception + pending.add(ensure_future(copy_one(src_key, dst_key))) + while pending: + done, pending = await wait(pending, return_when=FIRST_COMPLETED) + for task in done: + task.result() + finally: + for task in pending: + task.cancel() + if pending: + await gather(*pending, return_exceptions=True) + + +async def _execute_copy( + *, + write_order: Literal["data_first", "metadata_first"], + copy_one: Callable[[str, str], Awaitable[None]], + limit: int, + data_pairs: Callable[[], AsyncIterator[tuple[str, str]]], + child_metadata_pairs: list[tuple[str, str]], + root_metadata_pairs: list[tuple[str, str]], +) -> None: + """ + Run a node/hierarchy copy in two ordered phases. + + For ``"data_first"`` all chunk/shard data is copied, then descendant metadata, then the root + node's metadata last of all -- so an interrupted copy never exposes an index (group metadata, + including consolidated metadata) that points at data which is not yet present. For + ``"metadata_first"`` the metadata is written before the data. + """ + if write_order == "data_first": + await _copy_keys(data_pairs(), copy_one, limit) + await _copy_keys(_aiter_pairs(child_metadata_pairs), copy_one, limit) + # The root metadata is the hierarchy's index; write it dead last. + await _copy_keys(_aiter_pairs(root_metadata_pairs), copy_one, limit) + else: + await _copy_keys( + _aiter_pairs([*child_metadata_pairs, *root_metadata_pairs]), copy_one, limit + ) + await _copy_keys(data_pairs(), copy_one, limit) + + def _iter_shard_regions( array: AnyArray | AnyAsyncArray, *, diff --git a/src/zarr/core/chunk_key_encodings.py b/src/zarr/core/chunk_key_encodings.py index 098f2c8981..5fe3bd7f46 100644 --- a/src/zarr/core/chunk_key_encodings.py +++ b/src/zarr/core/chunk_key_encodings.py @@ -54,6 +54,59 @@ def decode_chunk_key(self, chunk_key: str) -> tuple[int, ...]: """ raise NotImplementedError(f"{self.__class__.__name__} does not implement decode_chunk_key.") + def supports_decode(self) -> bool: + """ + Whether this encoding can decode chunk keys back into coordinates. + + True when ``decode_chunk_key`` is overridden (as it is for the built-in ``default`` and + ``v2`` encodings). Callers that need to recognize chunk keys -- e.g. to validate or filter + store keys without enumerating the whole chunk grid -- can use this to decide whether + [`is_chunk_key`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.is_chunk_key] is usable. + """ + return type(self).decode_chunk_key is not ChunkKeyEncoding.decode_chunk_key + + def is_chunk_key(self, key: str, *, grid_shape: tuple[int, ...]) -> bool: + """ + Return whether ``key`` is a valid chunk key for a chunk grid of shape ``grid_shape``. + + A key is valid when it is a well-formed, *canonical* encoding of chunk coordinates of the + right dimensionality (``len(grid_shape)``) whose coordinates all fall within the grid + (``0 <= coord < grid_shape[dim]``). Canonical means the key round-trips: + ``encode_chunk_key(decode_chunk_key(key)) == key``, so non-canonical strings such as a + zero-padded ``"c/01"`` are rejected, as are metadata documents and foreign keys that happen + to share the array's prefix. + + This is O(1) in the number of stored chunks, so it can classify store keys discovered by + listing without materializing the chunk grid -- the key primitive for streaming operations + over arrays with very large numbers of chunks. Requires + [`supports_decode`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.supports_decode]; raises + ``NotImplementedError`` otherwise. + + Parameters + ---------- + key : str + The candidate chunk key, relative to the array's path. + grid_shape : tuple[int, ...] + The number of chunks along each dimension of the (top-level / stored-object) chunk + grid. For a sharded array this is the shard grid, since shards are the stored objects. + + Returns + ------- + bool + """ + ndim = len(grid_shape) + # A zero-dimensional array stores its single chunk under a special key ("c" for the + # default encoding, "0" for v2) that does not decode back to an empty coordinate tuple. + if ndim == 0: + return key == self.encode_chunk_key(()) + try: + coords = self.decode_chunk_key(key) + except (ValueError, IndexError): + return False + if len(coords) != ndim or self.encode_chunk_key(coords) != key: + return False + return all(0 <= coord < size for coord, size in zip(coords, grid_shape, strict=True)) + @abstractmethod def encode_chunk_key(self, chunk_coords: tuple[int, ...]) -> str: """ @@ -79,7 +132,12 @@ def __post_init__(self) -> None: def decode_chunk_key(self, chunk_key: str) -> tuple[int, ...]: if chunk_key == "c": return () - return tuple(map(int, chunk_key[1:].split(self.separator))) + # Keys are "c" followed by the separator and the separator-joined coordinates, + # e.g. "c/0/1" or "c.0.1". Strip the leading "c" before splitting. + prefix = "c" + self.separator + if not chunk_key.startswith(prefix): + raise ValueError(f"Invalid chunk key {chunk_key!r} for {self.name} encoding.") + return tuple(int(part) for part in chunk_key[len(prefix) :].split(self.separator)) def encode_chunk_key(self, chunk_coords: tuple[int, ...]) -> str: return self.separator.join(map(str, ("c",) + chunk_coords)) diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index 08d2a50ace..4691057ff9 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -103,6 +103,7 @@ def enable_gpu(self) -> ConfigSet: "sharding_coalesce_max_bytes": 16 << 20, # 16 MiB }, "async": {"concurrency": 10, "timeout": None}, + "copy": {"write_order": "data_first"}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index 52eaa3e144..0e4a7794f9 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -27,7 +27,12 @@ FiltersLike, SerializerLike, ShardsLike, + _array_data_pairs, + _execute_copy, + _make_key_copier, + _node_metadata_pairs, _parse_deprecated_compressor, + _resolve_write_order, create_array, ) from zarr.core.attributes import Attributes @@ -689,6 +694,97 @@ def from_dict( store_path=store_path, ) + async def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + use_consolidated_for_children: bool = True, + write_order: Literal["data_first", "metadata_first"] | None = None, + ) -> AsyncGroup: + """ + Copy this group and all its contents to a new store. + + This performs a raw byte-level copy of all data, without decoding or re-encoding array + contents. Only the data keys an array's metadata defines that exist in the store are + copied, so foreign keys and unwritten (sparse) chunks are skipped. Objects are transferred + concurrently (governed by the ``async.concurrency`` config value) and streamed, so the full + set of keys is never held in memory at once. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Group path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + use_consolidated_for_children : bool, default True + Whether to use the consolidated metadata of child groups when iterating over the store contents. + Note that this only affects groups loaded from the store. If the current Group already has + consolidated metadata, it will always be used. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination, controlling what a partially + completed (e.g. interrupted) copy looks like to a reader. ``"data_first"`` copies all + chunk/shard data first, then metadata, with the root group metadata (the hierarchy's + index, including any consolidated metadata) written dead last -- so an interrupted copy + never exposes an index that points at data which is not yet present. ``"metadata_first"`` + writes metadata before data, so the hierarchy is browsable mid-copy with missing chunks + reading as ``fill_value``. If None (default), the ``copy.write_order`` config value is used. + + Returns + ------- + AsyncGroup + The new group in the target store. + """ + resolved_order = _resolve_write_order(write_order) + target_store_path = await make_store_path(store, path=path or "") + + if overwrite: + await target_store_path.store.delete_dir(target_store_path.path) + else: + await ensure_no_existing_node(target_store_path, zarr_format=self.metadata.zarr_format) + + src_prefix = self.store_path.path + "/" if self.store_path.path else "" + dst_prefix = target_store_path.path + "/" if target_store_path.path else "" + copy_one = _make_key_copier( + self.store_path.store, target_store_path.store, default_buffer_prototype() + ) + + # Single traversal of the hierarchy. We collect only the metadata key pairs (one small set + # per node) and references to the array nodes -- never the data keys -- so memory stays + # bounded by the number of nodes, not the number of chunks. + root_metadata_pairs = list(_node_metadata_pairs(self.metadata, src_prefix, dst_prefix)) + child_metadata_pairs: list[tuple[str, str]] = [] + array_nodes: list[tuple[AnyAsyncArray, str, str]] = [] + async for child_path, member in self.members( + max_depth=None, use_consolidated_for_children=use_consolidated_for_children + ): + node_src = f"{src_prefix}{child_path}/" + node_dst = f"{dst_prefix}{child_path}/" + child_metadata_pairs.extend(_node_metadata_pairs(member.metadata, node_src, node_dst)) + if isinstance(member, AsyncArray): + array_nodes.append((member, node_src, node_dst)) + + async def data_pairs() -> AsyncIterator[tuple[str, str]]: + # Each array's data keys are materialized one array at a time, so iterating the whole + # hierarchy's chunks never holds more than a single array's key set. + for array, node_src, node_dst in array_nodes: + async for pair in _array_data_pairs(array, node_src, node_dst): + yield pair + + await _execute_copy( + write_order=resolved_order, + copy_one=copy_one, + limit=config.get("async.concurrency"), + data_pairs=data_pairs, + child_metadata_pairs=child_metadata_pairs, + root_metadata_pairs=root_metadata_pairs, + ) + + return await type(self).open(target_store_path, zarr_format=self.metadata.zarr_format) + async def setitem(self, key: str, value: Any) -> None: """ Fastpath for creating a new array @@ -1805,6 +1901,54 @@ def open( obj = sync(AsyncGroup.open(store, zarr_format=zarr_format)) return cls(obj) + def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + use_consolidated_for_children: bool = True, + write_order: Literal["data_first", "metadata_first"] | None = None, + ) -> Group: + """ + Copy this group and all its contents to a new store. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Group path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + use_consolidated_for_children : bool, default True + Whether to use the consolidated metadata of child groups when iterating over the store contents. + Note that this only affects groups loaded from the store. If the current Group already has + consolidated metadata, it will always be used. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination. ``"data_first"`` (the default + behaviour when None, via the ``copy.write_order`` config value) copies all data first, + then metadata, with the root group metadata written dead last, so an interrupted copy + never exposes an index pointing at absent data. ``"metadata_first"`` writes metadata + first, so the hierarchy is browsable mid-copy with missing chunks reading as ``fill_value``. + + Returns + ------- + Group + The new group in the target store. + """ + return Group( + sync( + self._async_group.copy_to( + store=store, + path=path, + overwrite=overwrite, + use_consolidated_for_children=use_consolidated_for_children, + write_order=write_order, + ) + ) + ) + def __getitem__(self, path: str) -> AnyArray | Group: """Obtain a group member. @@ -2327,13 +2471,22 @@ def tree( self._async_group.tree(expand=expand, level=level, max_nodes=max_nodes, plain=plain) ) - def create_group(self, name: str, **kwargs: Any) -> Group: + def create_group( + self, + name: str, + overwrite: bool = False, + attributes: dict[str, Any] | None = None, + ) -> Group: """Create a sub-group. Parameters ---------- name : str Name of the new subgroup. + overwrite : bool, optional + If True, do not raise an error if the group already exists. + attributes : dict, optional + Group attributes. Returns ------- @@ -2347,7 +2500,15 @@ def create_group(self, name: str, **kwargs: Any) -> Group: >>> subgroup """ - return Group(self._sync(self._async_group.create_group(name, **kwargs))) + return Group( + self._sync( + self._async_group.create_group( + name, + overwrite=overwrite, + attributes=attributes, + ) + ) + ) def require_group(self, name: str, **kwargs: Any) -> Group: """Obtain a sub-group, creating one if it doesn't exist. diff --git a/src/zarr/core/metadata/v2.py b/src/zarr/core/metadata/v2.py index ac32521239..1d496ba544 100644 --- a/src/zarr/core/metadata/v2.py +++ b/src/zarr/core/metadata/v2.py @@ -32,12 +32,13 @@ from zarr.core._json import json_to_buffer from zarr.core.array_spec import ArrayConfig, ArraySpec -from zarr.core.chunk_key_encodings import parse_separator +from zarr.core.chunk_key_encodings import ChunkKeyEncoding, V2ChunkKeyEncoding, parse_separator from zarr.core.common import ( JSON, ZARRAY_JSON, ZATTRS_JSON, MemoryOrder, + ceildiv, parse_shapelike, ) from zarr.core.config import config, parse_indexing_order @@ -116,6 +117,26 @@ def __init__( def ndim(self) -> int: return len(self.shape) + @property + def chunk_key_encoding(self) -> ChunkKeyEncoding: + """The chunk key encoding for this array (a v2 encoding using its dimension separator).""" + return V2ChunkKeyEncoding(separator=self.dimension_separator) + + def is_chunk_key(self, key: str) -> bool: + """ + Return whether ``key`` (relative to the array's path) is a valid storage key for one of + this array's stored chunks. + + A key is valid when it is a well-formed, canonical, in-bounds chunk key per this array's + chunk key encoding and grid (see + [`ChunkKeyEncoding.is_chunk_key`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.is_chunk_key]). + Metadata documents and foreign keys that share the array's prefix return False. This is + O(1) in the number of chunks, so store keys can be classified by listing rather than by + enumerating the chunk grid. + """ + grid_shape = tuple(ceildiv(s, c) for s, c in zip(self.shape, self.chunks, strict=True)) + return self.chunk_key_encoding.is_chunk_key(key, grid_shape=grid_shape) + @cached_property def chunk_grid(self) -> ChunkGrid: """Backwards-compatible chunk grid property. diff --git a/src/zarr/core/metadata/v3.py b/src/zarr/core/metadata/v3.py index 9eaccc5076..78709c9292 100644 --- a/src/zarr/core/metadata/v3.py +++ b/src/zarr/core/metadata/v3.py @@ -24,6 +24,7 @@ DimensionNamesLike, NamedConfig, NamedRequiredConfig, + ceildiv, compress_rle, expand_rle, parse_named_configuration, @@ -557,6 +558,37 @@ def _validate_metadata(self) -> None: def ndim(self) -> int: return len(self.shape) + @property + def _stored_object_grid_shape(self) -> tuple[int, ...]: + """ + The number of stored objects (chunks, or shards when sharding) along each dimension. + + This is the top-level chunk grid described by ``chunk_grid``: for a sharded array each + top-level chunk is stored as one shard, so this counts shards, matching the keys produced + by ``chunk_key_encoding.encode_chunk_key``. + """ + grid = self.chunk_grid + if isinstance(grid, RegularChunkGridMetadata): + return tuple(ceildiv(s, c) for s, c in zip(self.shape, grid.chunk_shape, strict=True)) + return tuple( + len(chunk_sizes) if isinstance(chunk_sizes, tuple) else ceildiv(extent, chunk_sizes) + for extent, chunk_sizes in zip(self.shape, grid.chunk_shapes, strict=True) + ) + + def is_chunk_key(self, key: str) -> bool: + """ + Return whether ``key`` (relative to the array's path) is a valid storage key for one of + this array's stored chunks/shards. + + A key is valid when it is a well-formed, canonical, in-bounds chunk key per this array's + chunk key encoding and grid (see + [`ChunkKeyEncoding.is_chunk_key`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.is_chunk_key]). + Metadata documents and foreign keys that share the array's prefix return False. This is + O(1) in the number of chunks, so store keys can be classified by listing rather than by + enumerating the chunk grid. + """ + return self.chunk_key_encoding.is_chunk_key(key, grid_shape=self._stored_object_grid_shape) + @property def dtype(self) -> ZDType[TBaseDType, TBaseScalar]: return self.data_type diff --git a/src/zarr/testing/strategies.py b/src/zarr/testing/strategies.py index f2c83677cc..d0bbcc012e 100644 --- a/src/zarr/testing/strategies.py +++ b/src/zarr/testing/strategies.py @@ -536,6 +536,7 @@ def basic_indices( allow_ellipsis: TrueOrFalse = True, ) -> Any: """Basic indices without unsupported negative slices.""" + strategy = npst.basic_indices( shape=shape, min_dims=min_dims, diff --git a/tests/test_array.py b/tests/test_array.py index 0d6d2d5906..e7f5b115cf 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -72,7 +72,7 @@ from zarr.core.group import AsyncGroup from zarr.core.indexing import BasicIndexer, _iter_grid, _iter_regions from zarr.core.metadata.v2 import ArrayV2Metadata -from zarr.core.sync import sync +from zarr.core.sync import _collect_aiterator, sync from zarr.errors import ( ContainsArrayError, ContainsGroupError, @@ -2374,3 +2374,112 @@ async def test_create_array_chunks_3d( shape = (10, 12, 15) arr = await create_array(store={}, shape=shape, chunks=chunk_input, dtype="float64") assert arr.write_chunk_sizes == expected + + +def test_array_copy_to(zarr_format: ZarrFormat) -> None: + """Array.copy_to performs a faithful byte-level copy of a single array node.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(30,), chunks=(7,), dtype="int64") + a[:] = np.arange(30) + + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + + dst_store = MemoryStore() + copied = arr.copy_to(dst_store, overwrite=True) + assert isinstance(copied, zarr.Array) + assert np.array_equal(copied[:], np.arange(30)) + + +def test_array_copy_to_with_path(zarr_format: ZarrFormat) -> None: + """Array.copy_to honors a destination path.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(15,), chunks=(4,), dtype="int64") + a[:] = np.arange(15) + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + + dst_store = MemoryStore() + copied = arr.copy_to(dst_store, path="nested/clone", overwrite=True) + assert copied.store_path.path == "nested/clone" + assert np.array_equal(copied[:], np.arange(15)) + + +def test_array_copy_to_overwrite_false_raises(zarr_format: ZarrFormat) -> None: + """Array.copy_to with overwrite=False raises when the destination is occupied.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(10,), chunks=(5,), dtype="int64") + a[:] = np.arange(10) + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + + dst_store = MemoryStore() + # Put a group at the destination root so the target location is occupied. + zarr.open_group(dst_store, mode="w", zarr_format=zarr_format) + with pytest.raises((ContainsArrayError, ContainsGroupError)): + arr.copy_to(dst_store, overwrite=False) + + +def test_array_copy_to_skips_foreign_and_unwritten(zarr_format: ZarrFormat) -> None: + """Array.copy_to copies only defined, written data keys; foreign keys and gaps are skipped.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(100,), chunks=(10,), dtype="int64") + a[0:10] = np.arange(10) # only the first chunk written -> sparse + # Plant a foreign key under the array prefix. + sync( + src_store.set("dataset/not_a_chunk", default_buffer_prototype().buffer.from_bytes(b"junk")) + ) + + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + dst_store = MemoryStore() + arr.copy_to(dst_store, overwrite=True) + + dst_keys = set(sync(_collect_aiterator(dst_store.list()))) + # The foreign key (which would relativize to "not_a_chunk") was not copied. + assert "not_a_chunk" not in dst_keys + # The one written chunk round-trips; the array is faithfully reproduced. + copied = zarr.open_array(dst_store) + assert np.array_equal(copied[0:10], np.arange(10)) + + +def test_copy_keys_bounds_in_flight_transfers() -> None: + """_copy_keys streams pairs and keeps at most ``limit`` transfers in flight.""" + import asyncio + + from zarr.core.array import _aiter_pairs, _copy_keys + + limit = 4 + state = {"in_flight": 0, "max_in_flight": 0} + + async def copy_one(src_key: str, dst_key: str) -> None: + state["in_flight"] += 1 + state["max_in_flight"] = max(state["max_in_flight"], state["in_flight"]) + await asyncio.sleep(0) + state["in_flight"] -= 1 + + pairs = [(f"s{i}", f"d{i}") for i in range(50)] + sync(_copy_keys(_aiter_pairs(pairs), copy_one, limit)) + + assert state["max_in_flight"] <= limit + assert state["max_in_flight"] > 1 # concurrency was actually exercised + + +def test_copy_keys_propagates_exceptions() -> None: + """_copy_keys surfaces an exception raised by an individual transfer.""" + import asyncio + + from zarr.core.array import _aiter_pairs, _copy_keys + + async def copy_one(src_key: str, dst_key: str) -> None: + await asyncio.sleep(0) + if src_key == "s3": + raise RuntimeError("boom") + + pairs = [(f"s{i}", f"d{i}") for i in range(10)] + with pytest.raises(RuntimeError, match="boom"): + sync(_copy_keys(_aiter_pairs(pairs), copy_one, 4)) diff --git a/tests/test_chunk_key_encodings.py b/tests/test_chunk_key_encodings.py new file mode 100644 index 0000000000..0f70fc6d2e --- /dev/null +++ b/tests/test_chunk_key_encodings.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import pytest + +import zarr +from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding +from zarr.storage import MemoryStore + + +@pytest.mark.parametrize("separator", [".", "/"]) +@pytest.mark.parametrize("coords", [(), (0,), (3,), (1, 2), (10, 0, 7)]) +def test_default_encoding_round_trips(separator: str, coords: tuple[int, ...]) -> None: + enc = DefaultChunkKeyEncoding(separator=separator) # type: ignore[arg-type] + key = enc.encode_chunk_key(coords) + assert enc.decode_chunk_key(key) == coords + + +@pytest.mark.parametrize("separator", [".", "/"]) +@pytest.mark.parametrize("coords", [(0,), (3,), (1, 2), (10, 0, 7)]) +def test_v2_encoding_round_trips(separator: str, coords: tuple[int, ...]) -> None: + enc = V2ChunkKeyEncoding(separator=separator) # type: ignore[arg-type] + key = enc.encode_chunk_key(coords) + assert enc.decode_chunk_key(key) == coords + + +@pytest.mark.parametrize( + "encoding", + [DefaultChunkKeyEncoding(separator="/"), V2ChunkKeyEncoding(separator=".")], +) +def test_supports_decode(encoding: DefaultChunkKeyEncoding | V2ChunkKeyEncoding) -> None: + assert encoding.supports_decode() is True + + +def test_default_decode_rejects_garbage() -> None: + enc = DefaultChunkKeyEncoding(separator="/") + for bad in ["zarr.json", "c/x", "0/0", "cc/0", "c/"]: + with pytest.raises(ValueError): + enc.decode_chunk_key(bad) + + +def test_encoding_is_chunk_key_grid_bounds() -> None: + enc = DefaultChunkKeyEncoding(separator="/") + grid = (4, 3) + assert enc.is_chunk_key("c/0/0", grid_shape=grid) + assert enc.is_chunk_key("c/3/2", grid_shape=grid) # last cell + assert not enc.is_chunk_key("c/4/0", grid_shape=grid) # row out of bounds + assert not enc.is_chunk_key("c/0/3", grid_shape=grid) # col out of bounds + assert not enc.is_chunk_key("c/0", grid_shape=grid) # wrong dimensionality + assert not enc.is_chunk_key("c/00/0", grid_shape=grid) # non-canonical (zero padded) + assert not enc.is_chunk_key("c/-1/0", grid_shape=grid) # negative + assert not enc.is_chunk_key("zarr.json", grid_shape=grid) # metadata document + assert not enc.is_chunk_key("foreign", grid_shape=grid) # foreign key + + +def test_encoding_is_chunk_key_scalar() -> None: + # A zero-dimensional array stores its single chunk under a special key. + assert DefaultChunkKeyEncoding(separator="/").is_chunk_key("c", grid_shape=()) + assert not DefaultChunkKeyEncoding(separator="/").is_chunk_key("c/0", grid_shape=()) + assert V2ChunkKeyEncoding(separator=".").is_chunk_key("0", grid_shape=()) + + +@pytest.mark.parametrize("zarr_format", [2, 3]) +def test_metadata_is_chunk_key(zarr_format: int) -> None: + g = zarr.open_group(MemoryStore(), mode="w", zarr_format=zarr_format) + arr = g.create_array("d", shape=(100, 80), chunks=(10, 20), dtype="int64") + m = arr.metadata + enc = m.chunk_key_encoding + + assert m.is_chunk_key(enc.encode_chunk_key((0, 0))) + assert m.is_chunk_key(enc.encode_chunk_key((9, 3))) # last chunk: ceil(100/10)-1, ceil(80/20)-1 + assert not m.is_chunk_key(enc.encode_chunk_key((10, 0))) # out of bounds + assert not m.is_chunk_key(enc.encode_chunk_key((0,))) # wrong dimensionality + assert not m.is_chunk_key("zarr.json") + assert not m.is_chunk_key(".zarray") + assert not m.is_chunk_key("not_a_chunk_key") + + +def test_metadata_is_chunk_key_sharded() -> None: + # For a sharded array the stored objects are shards, keyed over the shard grid. + g = zarr.open_group(MemoryStore(), mode="w", zarr_format=3) + arr = g.create_array("s", shape=(100,), chunks=(10,), shards=(50,), dtype="int64") + m = arr.metadata + assert m._stored_object_grid_shape == (2,) # ceil(100 / 50) shards + enc = m.chunk_key_encoding + assert m.is_chunk_key(enc.encode_chunk_key((0,))) + assert m.is_chunk_key(enc.encode_chunk_key((1,))) + assert not m.is_chunk_key(enc.encode_chunk_key((2,))) # only 2 shards exist diff --git a/tests/test_config.py b/tests/test_config.py index a758378dc7..344bcfb823 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -61,6 +61,7 @@ def test_config_defaults_set() -> None: "sharding_coalesce_max_bytes": 16 << 20, }, "async": {"concurrency": 10, "timeout": None}, + "copy": {"write_order": "data_first"}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/tests/test_group.py b/tests/test_group.py index 692b88c8af..ec68c02f5f 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -8,6 +8,7 @@ import re import time import warnings +from collections.abc import Callable from typing import TYPE_CHECKING, Any, Literal, get_args import numpy as np @@ -61,6 +62,91 @@ from zarr.core.common import JSON, ZarrFormat +def is_deprecated(method: Callable[..., Any]) -> bool: + """Check if a method is marked as deprecated.""" + # Check for @deprecated decorator + return hasattr(method, "__deprecated__") or ( + hasattr(method, "__wrapped__") and hasattr(method.__wrapped__, "__deprecated__") + ) + + +def get_method_names(cls: type) -> list[str]: + """Extract public method names from a class, excluding deprecated methods.""" + return [ + name + for name, method in inspect.getmembers(cls, predicate=inspect.isfunction) + if not name.startswith("_") and not is_deprecated(method) + ] + + +def get_method_signature(cls: type, method_name: str) -> dict[str, Any]: + """Get the signature of a method from a class.""" + method = getattr(cls, method_name) + sig = inspect.signature(method) + return {name: param for name, param in sig.parameters.items() if name != "self"} + + +# TODO Go one by one through the methods in skipped and fix the mismatches. +@pytest.mark.parametrize( + ("sync_class", "async_class", "skip_methods"), + [ + ( + Group, + AsyncGroup, + ["create", "update_attributes_async", "get", "require_array", "require_group"], + ) + ], +) +def test_class_method_parameters_match( + sync_class: type, async_class: type, skip_methods: list[str] +) -> None: + """ + Test that methods for classes and their async counterparts match. + + Tests that the parameters for sync and async methods match. This test, + tests parameter names, types, and default values. + """ + + method_names = get_method_names(sync_class) + for method in skip_methods: + method_names.remove(method) + + for method_name in method_names: + assert hasattr(async_class, method_name), ( + f"Async class {async_class.__name__} missing method '{method_name}'" + ) + + sync_params = get_method_signature(sync_class, method_name) + async_params = get_method_signature(async_class, method_name) + + sync_param_names = set(sync_params.keys()) + async_param_names = set(async_params.keys()) + + assert sync_param_names == async_param_names, ( + f"Parameter names don't match for '{method_name}'. " + f"Sync: {sync_param_names}, Async: {async_param_names}" + ) + + mismatches = [] + for param_name in sync_params: + sync_param = sync_params[param_name] + async_param = async_params[param_name] + + if sync_param.annotation != async_param.annotation: + mismatches.append( + f"{param_name}: annotation mismatch " + f"(sync: {sync_param.annotation}, async: {async_param.annotation})" + ) + + if sync_param.default != async_param.default: + mismatches.append( + f"{param_name}: default mismatch " + f"(sync: {sync_param.default}, async: {async_param.default})" + ) + + assert mismatches == [], f"Parameter mismatches in '{method_name}': {mismatches}" + + @pytest.fixture(params=["local", "memory", "zip"]) async def store(request: pytest.FixtureRequest, tmp_path: pathlib.Path) -> Store: result = await parse_store(request.param, str(tmp_path)) @@ -248,6 +334,327 @@ def test_group_members(store: Store, zarr_format: ZarrFormat, consolidated_metad members_observed = group.members(max_depth=-1) +@pytest.fixture +def copy_to_test_data( + request: pytest.FixtureRequest, +) -> tuple[Group, np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[Any]], int, bool]: + """Fixture that creates test data for copy_to tests.""" + zarr_format, shards, consolidate_metadata, src_path = request.param + + src_store = MemoryStore() + src = zarr.open_group( + src_store, + path=src_path or None, + mode="w", + zarr_format=zarr_format, + attributes={"root": True}, + ) + + subgroup = src.create_group("subgroup", attributes={"subgroup": True}) + + subgroup_arr_data = np.arange(50) + subgroup.create_array( + "subgroup_dataset", + shape=(50,), + chunks=(10,), + shards=shards, + dtype=subgroup_arr_data.dtype, + ) + subgroup["subgroup_dataset"] = subgroup_arr_data + + arr_data = np.arange(100) + src.create_array( + "dataset", + shape=(100,), + chunks=(10,), + shards=shards, + dtype=arr_data.dtype, + ) + src["dataset"] = arr_data + + src = consolidate(src_store, src, consolidate_metadata, zarr_format, path=src_path) + + return src, arr_data, subgroup_arr_data, zarr_format, consolidate_metadata + + +def consolidate( + src_store: MemoryStore, + src: Group, + consolidate_metadata: bool, + zarr_format: int, + path: str = "", +) -> Group: + if consolidate_metadata: + subgroup_path = f"{path}/subgroup" if path else "subgroup" + if zarr_format == 3: + with pytest.warns(ZarrUserWarning, match="Consolidated metadata is currently"): + src = zarr.consolidate_metadata(src_store, path=path) + with pytest.warns(ZarrUserWarning, match="Consolidated metadata is currently"): + zarr.consolidate_metadata(src_store, path=subgroup_path) + else: + src = zarr.consolidate_metadata(src_store, path=path) + zarr.consolidate_metadata(src_store, path=subgroup_path) + return src + + +def check_consolidated_metadata( + dst_store_root: MemoryStore, + consolidate_metadata: bool, + zarr_format: int, + path: str | None = None, +) -> None: + sub_path = "subgroup" if path is None else f"{path}/subgroup" + if consolidate_metadata: + assert zarr.open_group(dst_store_root, path=path).metadata.consolidated_metadata + if zarr_format == 3: + assert zarr.open_group(dst_store_root, path=sub_path).metadata.consolidated_metadata + else: + assert not zarr.open_group(dst_store_root).metadata.consolidated_metadata + assert not zarr.open_group(dst_store_root, path=sub_path).metadata.consolidated_metadata + + +@pytest.mark.parametrize( + "copy_to_test_data", + [ + (2, None, False, ""), + (2, None, True, ""), + (3, (50,), False, ""), + (3, (50,), True, ""), + (3, (50,), False, "nested/source"), + ], + indirect=True, +) +def test_copy_to( + copy_to_test_data: tuple[ + Group, np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[Any]], int, bool + ], +) -> None: + src, arr_data, subgroup_arr_data, zarr_format, consolidate_metadata = copy_to_test_data + + dst_store = MemoryStore() + + dst = src.copy_to(dst_store, overwrite=True) + + assert dst.attrs.get("root") is True + + subgroup = dst["subgroup"] + assert isinstance(subgroup, Group) + assert subgroup.attrs.get("subgroup") is True + + copied_arr = dst["dataset"] + copied_data = copied_arr[:] + assert np.array_equal(copied_data, arr_data) + + copied_subgroup_arr = subgroup["subgroup_dataset"] + copied_subgroup_data = copied_subgroup_arr[:] + assert np.array_equal(copied_subgroup_data, subgroup_arr_data) + + check_consolidated_metadata(dst_store, consolidate_metadata, zarr_format) + + +@pytest.mark.parametrize( + "copy_to_test_data", + [ + (2, None, False, ""), + (2, None, True, ""), + (3, (50,), False, ""), + (3, (50,), True, ""), + (3, (50,), False, "nested/source"), + ], + indirect=True, +) +def test_copy_to_with_path( + copy_to_test_data: tuple[ + Group, np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[Any]], int, bool + ], +) -> None: + src, arr_data, subgroup_arr_data, zarr_format, consolidate_metadata = copy_to_test_data + + dst_store = MemoryStore() + dst = src.copy_to(dst_store, path="sub/snapshot", overwrite=True) + + assert dst.attrs.get("root") is True + assert dst.store_path.path == "sub/snapshot" + + subgroup = dst["subgroup"] + assert isinstance(subgroup, Group) + assert subgroup.attrs.get("subgroup") is True + + copied_arr = dst["dataset"] + copied_data = copied_arr[:] + assert np.array_equal(copied_data, arr_data) + + copied_subgroup_arr = subgroup["subgroup_dataset"] + copied_subgroup_data = copied_subgroup_arr[:] + assert np.array_equal(copied_subgroup_data, subgroup_arr_data) + + check_consolidated_metadata(dst_store, consolidate_metadata, zarr_format, path="sub/snapshot") + + +def test_copy_to_overwrite_false_raises(zarr_format: ZarrFormat) -> None: + """copy_to with overwrite=False raises when a node already exists at the destination.""" + src = zarr.open_group(MemoryStore(), mode="w", zarr_format=zarr_format) + dst_store = MemoryStore() + zarr.open_group(dst_store, mode="w", zarr_format=zarr_format) + + with pytest.raises((ContainsGroupError, ContainsArrayError)): + src.copy_to(dst_store, overwrite=False) + + +def test_copy_to_skips_foreign_keys(zarr_format: ZarrFormat) -> None: + """copy_to copies only the data keys an array's metadata defines, not foreign keys under its prefix.""" + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + arr = src.create_array("dataset", shape=(10,), chunks=(5,), dtype="int64") + arr[:] = np.arange(10) + + # Plant a key under the array prefix that is not a defined chunk or metadata key. + foreign_key = "dataset/not_a_chunk_or_metadata_key" + sync(src_store.set(foreign_key, default_buffer_prototype().buffer.from_bytes(b"junk"))) + + dst_store = MemoryStore() + src.copy_to(dst_store, overwrite=True) + + dst_keys = sync(_collect_aiterator(dst_store.list())) + assert foreign_key not in dst_keys + # The array data itself was copied. + assert np.array_equal(zarr.open_group(dst_store)["dataset"][:], np.arange(10)) + + +def test_copy_to_skips_unwritten_chunks(zarr_format: ZarrFormat) -> None: + """copy_to reproduces exactly the source keys, materializing no unwritten chunks (sparse arrays).""" + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + # 10 chunks, but write only the first, leaving 9 chunks absent from the store. + arr = src.create_array("dataset", shape=(100,), chunks=(10,), dtype="int64") + arr[0:10] = np.arange(10) + + src_keys = set(sync(_collect_aiterator(src_store.list()))) + # Guard: the source really is sparse, so the test can detect spuriously created chunks. + assert sum(1 for k in src_keys if "dataset" in k and "0" in k.rsplit("/", 1)[-1]) == 1 + + dst_store = MemoryStore() + src.copy_to(dst_store, overwrite=True) + + # A faithful copy produces exactly the same keys: no unwritten chunks were created. + assert set(sync(_collect_aiterator(dst_store.list()))) == src_keys + + +class _RecordingMemoryStore(MemoryStore): + """A MemoryStore that records the order in which keys are written via ``set``.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.set_order: list[str] = [] + + async def set(self, key: str, value: Any, byte_range: Any = None) -> None: + await super().set(key, value, byte_range) + self.set_order.append(key) + + +def _is_metadata_key(key: str) -> bool: + return key.rsplit("/", 1)[-1] in ("zarr.json", ".zarray", ".zgroup", ".zattrs", ".zmetadata") + + +def _make_copy_source(zarr_format: ZarrFormat) -> Group: + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + arr = src.create_array("dataset", shape=(20,), chunks=(5,), dtype="int64") + arr[:] = np.arange(20) + return src + + +def test_copy_to_data_first_writes_data_then_metadata(zarr_format: ZarrFormat) -> None: + """data_first: every data object is written before any metadata, and root metadata is last.""" + src = _make_copy_source(zarr_format) + dst_store = _RecordingMemoryStore() + src.copy_to(dst_store, overwrite=True, write_order="data_first") + + order = dst_store.set_order + data_idx = [i for i, k in enumerate(order) if not _is_metadata_key(k)] + meta_idx = [i for i, k in enumerate(order) if _is_metadata_key(k)] + assert data_idx + assert meta_idx + # All data precedes all metadata. + assert max(data_idx) < min(meta_idx) + # The root group's metadata document (the hierarchy index) is written dead last. + assert _is_metadata_key(order[-1]) + assert "/" not in order[-1] + + +def test_copy_to_metadata_first_writes_metadata_then_data(zarr_format: ZarrFormat) -> None: + """metadata_first: all metadata is written before any data object.""" + src = _make_copy_source(zarr_format) + dst_store = _RecordingMemoryStore() + src.copy_to(dst_store, overwrite=True, write_order="metadata_first") + + order = dst_store.set_order + data_idx = [i for i, k in enumerate(order) if not _is_metadata_key(k)] + meta_idx = [i for i, k in enumerate(order) if _is_metadata_key(k)] + assert data_idx + assert meta_idx + assert max(meta_idx) < min(data_idx) + + +def test_copy_to_default_write_order_follows_config(zarr_format: ZarrFormat) -> None: + """With no write_order argument, copy_to honors the copy.write_order config value.""" + src = _make_copy_source(zarr_format) + dst_store = _RecordingMemoryStore() + with zarr_config.set({"copy.write_order": "metadata_first"}): + src.copy_to(dst_store, overwrite=True) + order = dst_store.set_order + data_idx = [i for i, k in enumerate(order) if not _is_metadata_key(k)] + meta_idx = [i for i, k in enumerate(order) if _is_metadata_key(k)] + assert max(meta_idx) < min(data_idx) + + +def test_copy_to_invalid_write_order_raises(zarr_format: ZarrFormat) -> None: + src = zarr.open_group(MemoryStore(), mode="w", zarr_format=zarr_format) + with pytest.raises(ValueError, match="write_order"): + src.copy_to(MemoryStore(), overwrite=True, write_order="sideways") # type: ignore[arg-type] + + +def test_copy_to_streams_array_with_huge_grid(zarr_format: ZarrFormat) -> None: + """ + copy_to streams stored keys rather than enumerating the chunk grid. + + We build a sparse array whose chunk grid is astronomically large (~1e12 chunks) by writing a + small array and then rewriting its stored metadata to claim a huge shape -- the situation that + arises when opening an array written by another implementation. ``create_array`` itself does + work proportional to the chunk count, so the array is constructed via metadata surgery. Any + copy that materialized the chunk grid would hang here; only a streaming copy (list stored keys + + O(1) recognition) can reproduce exactly the keys that exist. + """ + huge = 1_000_000_000_000 + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + arr = src.create_array("dataset", shape=(4,), chunks=(1,), dtype="int64") + arr[0] = 10 + arr[3] = 13 # chunk index 3 stays in-bounds after the shape grows + + # Rewrite the stored metadata to claim a huge shape, leaving the two chunks in place. + proto = default_buffer_prototype() + meta_key = "dataset/zarr.json" if zarr_format == 3 else "dataset/.zarray" + raw = sync(src_store.get(meta_key, prototype=proto)) + assert raw is not None + doc = json.loads(raw.to_bytes()) + doc["shape"] = [huge] + sync(src_store.set(meta_key, proto.buffer.from_bytes(json.dumps(doc).encode()))) + + reopened = zarr.open_group(src_store) + src_keys = set(sync(_collect_aiterator(src_store.list()))) + + dst_store = MemoryStore() + reopened.copy_to(dst_store, overwrite=True) + + # A faithful copy reproduces exactly the keys that exist in the source. + assert set(sync(_collect_aiterator(dst_store.list()))) == src_keys + copied = zarr.open_group(dst_store)["dataset"] + assert isinstance(copied, Array) + assert copied.shape == (huge,) + assert copied[3] == 13 + + def test_group(store: Store, zarr_format: ZarrFormat) -> None: """ Test basic Group routines.