From 905d972101b86c5264ca689b42f03a7881311288 Mon Sep 17 00:00:00 2001 From: d-v-b-agent Date: Tue, 23 Jun 2026 13:18:57 +0000 Subject: [PATCH 1/4] feat(group): add copy_to convenience method for Group Adds Group.copy_to / AsyncGroup.copy_to, which copies a group and all of its contents (sub-groups and arrays) to a destination store, which may be a different store type than the source. The copy is a raw byte-level transfer of metadata documents and chunk/shard objects -- it does not decode or re-encode array data -- and reproduces the source keys exactly, including consolidated metadata. Supports a destination path, overwrite, and control over whether child consolidated metadata is used during traversal. Original work by Wouter-Michiel Vierdag and Davis Bennett in zarr-developers/zarr-python#3612. Co-authored-by: Wouter-Michiel Vierdag Co-authored-by: Davis Bennett --- changes/3612.feature.md | 3 + docs/user-guide/groups.md | 10 ++ src/zarr/core/array.py | 42 ++++- src/zarr/core/group.py | 161 +++++++++++++++++- src/zarr/testing/strategies.py | 1 + tests/test_group.py | 292 +++++++++++++++++++++++++++++++++ 6 files changed, 504 insertions(+), 5 deletions(-) create mode 100644 changes/3612.feature.md diff --git a/changes/3612.feature.md b/changes/3612.feature.md new file mode 100644 index 0000000000..821159d418 --- /dev/null +++ b/changes/3612.feature.md @@ -0,0 +1,3 @@ +Added the convenience method for `zarr.Group` to copy to a destination store which +can be of a different type than the original store of the `zarr.Group` to be +copied. This will also copy over the metadata as is. \ No newline at end of file diff --git a/docs/user-guide/groups.md b/docs/user-guide/groups.md index 5faa26a281..277a921863 100644 --- a/docs/user-guide/groups.md +++ b/docs/user-guide/groups.md @@ -131,3 +131,13 @@ Groups also have the [`zarr.Group.tree`][] method, e.g.: print(root.tree()) ``` +!!! note + [`zarr.Group.tree`][] requires the optional [rich](https://rich.readthedocs.io/en/stable/) dependency. It can be installed with the `[tree]` extra. + +You can copy a Group including consolidated metadata to a new destination store +(type of store can differ from the source store) using the `copy_to` method: + +```python exec="true" session="groups" source="above" result="ansi" +destination_store = zarr.storage.MemoryStore() +new_group = root.copy_to(destination_store, overwrite=True) +``` diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 977520b12e..ebc9b38d94 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -60,6 +60,8 @@ ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, + ZGROUP_JSON, + ZMETADATA_V2_JSON, ChunksLike, DimensionNamesLike, MemoryOrder, @@ -156,6 +158,7 @@ from zarr.abc.store import Store from zarr.codecs.sharding import IndexLocation from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar + from zarr.core.group import GroupMetadata from zarr.storage import StoreLike from zarr.types import AnyArray, AnyAsyncArray, ArrayV2, ArrayV3, AsyncArrayV2, AsyncArrayV3 @@ -3975,9 +3978,9 @@ async def _shards_initialized( store_contents = [ x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path) ] - store_contents_relative = [ + store_contents_relative = { _relativize_path(path=key, prefix=array.store_path.path) for key in store_contents - ] + } return tuple( chunk_key for chunk_key in array._iter_shard_keys() if chunk_key in store_contents_relative ) @@ -5193,6 +5196,41 @@ def _iter_shard_keys( return (array.metadata.encode_chunk_key(k) for k in _iter) +def _iter_metadata_keys( + metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata, +) -> Iterator[str]: + """ + Iterate over the storage keys of the metadata documents for a node, relative to the node's path. + + These keys are defined entirely by the node's zarr format and type. This is the closed set of + metadata keys a node may own; some keys (e.g. v2 consolidated metadata) are only present in the + store under certain conditions, so callers that copy or read these keys should tolerate absent + ones rather than assuming all are present. + + Parameters + ---------- + metadata : ArrayV2Metadata | ArrayV3Metadata | GroupMetadata + The metadata of the node to enumerate keys for. + + Yields + ------ + key: str + The relative storage key of each metadata document. + """ + from zarr.core.group import GroupMetadata + + if metadata.zarr_format == 3: + yield ZARR_JSON + elif isinstance(metadata, GroupMetadata): + yield ZGROUP_JSON + yield ZATTRS_JSON + # Consolidated metadata is only written when the group is consolidated; it may be absent. + yield ZMETADATA_V2_JSON + else: + yield ZARRAY_JSON + yield ZATTRS_JSON + + def _iter_shard_regions( array: AnyArray | AnyAsyncArray, *, diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index 52eaa3e144..4c949f0188 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -27,7 +27,9 @@ FiltersLike, SerializerLike, ShardsLike, + _iter_metadata_keys, _parse_deprecated_compressor, + _shards_initialized, create_array, ) from zarr.core.attributes import Attributes @@ -464,6 +466,7 @@ async def from_store( store: StoreLike, *, attributes: dict[str, Any] | None = None, + consolidated_metadata: ConsolidatedMetadata | None = None, overwrite: bool = False, zarr_format: ZarrFormat = 3, ) -> AsyncGroup: @@ -478,7 +481,11 @@ async def from_store( await ensure_no_existing_node(store_path, zarr_format=zarr_format) attributes = attributes or {} group = cls( - metadata=GroupMetadata(attributes=attributes, zarr_format=zarr_format), + metadata=GroupMetadata( + attributes=attributes, + consolidated_metadata=consolidated_metadata, + zarr_format=zarr_format, + ), store_path=store_path, ) await group._save_metadata(ensure_parents=True) @@ -689,6 +696,83 @@ def from_dict( store_path=store_path, ) + async def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + use_consolidated_for_children: bool = True, + ) -> AsyncGroup: + """ + Copy this group and all its contents to a new store. + + This performs a raw byte-level copy of all data, without decoding or + re-encoding array contents. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Group path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + use_consolidated_for_children : bool, default True + Whether to use the consolidated metadata of child groups when iterating over the store contents. + Note that this only affects groups loaded from the store. If the current Group already has + consolidated metadata, it will always be used. + + Returns + ------- + AsyncGroup + The new group in the target store. + """ + target_store_path = await make_store_path(store, path=path or "") + + if overwrite: + dst_store = target_store_path.store + dst_prefix = target_store_path.path + "/" if target_store_path.path else "" + async for key in dst_store.list_prefix(dst_prefix): + await dst_store.delete(key) + else: + await ensure_no_existing_node(target_store_path, zarr_format=self.metadata.zarr_format) + + src_store = self.store_path.store + src_prefix = self.store_path.path + "/" if self.store_path.path else "" + dst_store = target_store_path.store + dst_prefix = target_store_path.path + "/" if target_store_path.path else "" + + prototype = default_buffer_prototype() + + async def _copy_key(src_key: str, dst_key: str) -> None: + """Copy a single key from source to destination store.""" + data = await src_store.get(src_key, prototype=prototype) + if data is not None: + await dst_store.set(dst_key, data) + + async def _copy_node(node_prefix: str, relative_keys: Iterable[str]) -> None: + """Copy a node's keys, which are relative to ``node_prefix`` within each node's path.""" + for relative_key in relative_keys: + node_key = f"{node_prefix}{relative_key}" if node_prefix else relative_key + await _copy_key(f"{src_prefix}{node_key}", f"{dst_prefix}{node_key}") + + # Copy the root group's metadata keys. + await _copy_node("", _iter_metadata_keys(self.metadata)) + + # Copy all children discovered via members(). + async for child_path, member in self.members( + max_depth=None, use_consolidated_for_children=use_consolidated_for_children + ): + node_prefix = f"{child_path}/" + await _copy_node(node_prefix, _iter_metadata_keys(member.metadata)) + if isinstance(member, AsyncArray): + # Copy only the data keys that the array's metadata defines and that exist in the + # store. This skips both foreign keys under the array prefix and unwritten chunks. + await _copy_node(node_prefix, await _shards_initialized(member)) + + return await type(self).open(target_store_path, zarr_format=self.metadata.zarr_format) + async def setitem(self, key: str, value: Any) -> None: """ Fastpath for creating a new array @@ -937,6 +1021,7 @@ async def create_group( *, overwrite: bool = False, attributes: dict[str, Any] | None = None, + consolidated_metadata: ConsolidatedMetadata | None = None, ) -> AsyncGroup: """Create a sub-group. @@ -948,6 +1033,9 @@ async def create_group( If True, do not raise an error if the group already exists. attributes : dict, optional Group attributes. + consolidated_metadata : ConsolidatedMetadata, optional + Consolidated Zarr metadata mapping that represents the entire hierarchy's + group and array metadata collected into a single dictionary. Returns ------- @@ -957,6 +1045,7 @@ async def create_group( return await type(self).from_store( self.store_path / name, attributes=attributes, + consolidated_metadata=consolidated_metadata, overwrite=overwrite, zarr_format=self.metadata.zarr_format, ) @@ -1741,6 +1830,7 @@ def from_store( store: StoreLike, *, attributes: dict[str, Any] | None = None, + consolidated_metadata: ConsolidatedMetadata | None = None, zarr_format: ZarrFormat = 3, overwrite: bool = False, ) -> Group: @@ -1754,6 +1844,8 @@ def from_store( for a description of all valid StoreLike values. attributes : dict, optional A dictionary of JSON-serializable values with user-defined attributes. + consolidated_metadata : ConsolidatedMetadata, optional + Consolidated Metadata for this Group. This should contain metadata of child nodes below this group. zarr_format : {2, 3}, optional Zarr storage format version. overwrite : bool, optional @@ -1773,6 +1865,7 @@ def from_store( AsyncGroup.from_store( store, attributes=attributes, + consolidated_metadata=consolidated_metadata, overwrite=overwrite, zarr_format=zarr_format, ), @@ -1805,6 +1898,46 @@ def open( obj = sync(AsyncGroup.open(store, zarr_format=zarr_format)) return cls(obj) + def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + use_consolidated_for_children: bool = True, + ) -> Group: + """ + Copy this group and all its contents to a new store. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Group path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + use_consolidated_for_children : bool, default True + Whether to use the consolidated metadata of child groups when iterating over the store contents. + Note that this only affects groups loaded from the store. If the current Group already has + consolidated metadata, it will always be used. + + Returns + ------- + Group + The new group in the target store. + """ + return Group( + sync( + self._async_group.copy_to( + store=store, + path=path, + overwrite=overwrite, + use_consolidated_for_children=use_consolidated_for_children, + ) + ) + ) + def __getitem__(self, path: str) -> AnyArray | Group: """Obtain a group member. @@ -2327,13 +2460,26 @@ def tree( self._async_group.tree(expand=expand, level=level, max_nodes=max_nodes, plain=plain) ) - def create_group(self, name: str, **kwargs: Any) -> Group: + def create_group( + self, + name: str, + overwrite: bool = False, + attributes: dict[str, Any] | None = None, + consolidated_metadata: ConsolidatedMetadata | None = None, + ) -> Group: """Create a sub-group. Parameters ---------- name : str Name of the new subgroup. + overwrite : bool, optional + If True, do not raise an error if the group already exists. + attributes : dict, optional + Group attributes. + consolidated_metadata : ConsolidatedMetadata, optional + Consolidated Zarr metadata mapping that represents the entire hierarchy's + group and array metadata collected into a single dictionary. Returns ------- @@ -2347,7 +2493,16 @@ def create_group(self, name: str, **kwargs: Any) -> Group: >>> subgroup """ - return Group(self._sync(self._async_group.create_group(name, **kwargs))) + return Group( + self._sync( + self._async_group.create_group( + name, + overwrite=overwrite, + attributes=attributes, + consolidated_metadata=consolidated_metadata, + ) + ) + ) def require_group(self, name: str, **kwargs: Any) -> Group: """Obtain a sub-group, creating one if it doesn't exist. diff --git a/src/zarr/testing/strategies.py b/src/zarr/testing/strategies.py index f2c83677cc..d0bbcc012e 100644 --- a/src/zarr/testing/strategies.py +++ b/src/zarr/testing/strategies.py @@ -536,6 +536,7 @@ def basic_indices( allow_ellipsis: TrueOrFalse = True, ) -> Any: """Basic indices without unsupported negative slices.""" + strategy = npst.basic_indices( shape=shape, min_dims=min_dims, diff --git a/tests/test_group.py b/tests/test_group.py index 692b88c8af..2defce6ff8 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -8,6 +8,7 @@ import re import time import warnings +from collections.abc import Callable from typing import TYPE_CHECKING, Any, Literal, get_args import numpy as np @@ -61,6 +62,91 @@ from zarr.core.common import JSON, ZarrFormat +def is_deprecated(method: Callable[..., Any]) -> bool: + """Check if a method is marked as deprecated.""" + # Check for @deprecated decorator + return hasattr(method, "__deprecated__") or ( + hasattr(method, "__wrapped__") and hasattr(method.__wrapped__, "__deprecated__") + ) + + +def get_method_names(cls: type) -> list[str]: + """Extract public method names from a class, excluding deprecated methods.""" + return [ + name + for name, method in inspect.getmembers(cls, predicate=inspect.isfunction) + if not name.startswith("_") and not is_deprecated(method) + ] + + +def get_method_signature(cls: type, method_name: str) -> dict[str, Any]: + """Get the signature of a method from a class.""" + method = getattr(cls, method_name) + sig = inspect.signature(method) + return {name: param for name, param in sig.parameters.items() if name != "self"} + + +# TODO Go one by one through the methods in skipped and fix the mismatches. +@pytest.mark.parametrize( + ("sync_class", "async_class", "skip_methods"), + [ + ( + Group, + AsyncGroup, + ["create", "update_attributes_async", "get", "require_array", "require_group"], + ) + ], +) +def test_class_method_parameters_match( + sync_class: type, async_class: type, skip_methods: list[str] +) -> None: + """ + Test that methods for classes and their async counterparts match. + + Tests that the parameters for sync and async methods match. This test, + tests parameter names, types, and default values. + """ + + method_names = get_method_names(sync_class) + for method in skip_methods: + method_names.remove(method) + + for method_name in method_names: + assert hasattr(async_class, method_name), ( + f"Async class {async_class.__name__} missing method '{method_name}'" + ) + + sync_params = get_method_signature(sync_class, method_name) + async_params = get_method_signature(async_class, method_name) + + sync_param_names = set(sync_params.keys()) + async_param_names = set(async_params.keys()) + + assert sync_param_names == async_param_names, ( + f"Parameter names don't match for '{method_name}'. " + f"Sync: {sync_param_names}, Async: {async_param_names}" + ) + + mismatches = [] + for param_name in sync_params: + sync_param = sync_params[param_name] + async_param = async_params[param_name] + + if sync_param.annotation != async_param.annotation: + mismatches.append( + f"{param_name}: annotation mismatch " + f"(sync: {sync_param.annotation}, async: {async_param.annotation})" + ) + + if sync_param.default != async_param.default: + mismatches.append( + f"{param_name}: default mismatch " + f"(sync: {sync_param.default}, async: {async_param.default})" + ) + + assert mismatches == [], f"Parameter mismatches in '{method_name}': {mismatches}" + + @pytest.fixture(params=["local", "memory", "zip"]) async def store(request: pytest.FixtureRequest, tmp_path: pathlib.Path) -> Store: result = await parse_store(request.param, str(tmp_path)) @@ -248,6 +334,212 @@ def test_group_members(store: Store, zarr_format: ZarrFormat, consolidated_metad members_observed = group.members(max_depth=-1) +@pytest.fixture +def copy_to_test_data( + request: pytest.FixtureRequest, +) -> tuple[Group, np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[Any]], int, bool]: + """Fixture that creates test data for copy_to tests.""" + zarr_format, shards, consolidate_metadata, src_path = request.param + + src_store = MemoryStore() + src = zarr.open_group( + src_store, + path=src_path or None, + mode="w", + zarr_format=zarr_format, + attributes={"root": True}, + ) + + subgroup = src.create_group("subgroup", attributes={"subgroup": True}) + + subgroup_arr_data = np.arange(50) + subgroup.create_array( + "subgroup_dataset", + shape=(50,), + chunks=(10,), + shards=shards, + dtype=subgroup_arr_data.dtype, + ) + subgroup["subgroup_dataset"] = subgroup_arr_data + + arr_data = np.arange(100) + src.create_array( + "dataset", + shape=(100,), + chunks=(10,), + shards=shards, + dtype=arr_data.dtype, + ) + src["dataset"] = arr_data + + src = consolidate(src_store, src, consolidate_metadata, zarr_format, path=src_path) + + return src, arr_data, subgroup_arr_data, zarr_format, consolidate_metadata + + +def consolidate( + src_store: MemoryStore, + src: Group, + consolidate_metadata: bool, + zarr_format: int, + path: str = "", +) -> Group: + if consolidate_metadata: + subgroup_path = f"{path}/subgroup" if path else "subgroup" + if zarr_format == 3: + with pytest.warns(ZarrUserWarning, match="Consolidated metadata is currently"): + src = zarr.consolidate_metadata(src_store, path=path) + with pytest.warns(ZarrUserWarning, match="Consolidated metadata is currently"): + zarr.consolidate_metadata(src_store, path=subgroup_path) + else: + src = zarr.consolidate_metadata(src_store, path=path) + zarr.consolidate_metadata(src_store, path=subgroup_path) + return src + + +def check_consolidated_metadata( + dst_store_root: MemoryStore, + consolidate_metadata: bool, + zarr_format: int, + path: str | None = None, +) -> None: + sub_path = "subgroup" if path is None else f"{path}/subgroup" + if consolidate_metadata: + assert zarr.open_group(dst_store_root, path=path).metadata.consolidated_metadata + if zarr_format == 3: + assert zarr.open_group(dst_store_root, path=sub_path).metadata.consolidated_metadata + else: + assert not zarr.open_group(dst_store_root).metadata.consolidated_metadata + assert not zarr.open_group(dst_store_root, path=sub_path).metadata.consolidated_metadata + + +@pytest.mark.parametrize( + "copy_to_test_data", + [ + (2, None, False, ""), + (2, None, True, ""), + (3, (50,), False, ""), + (3, (50,), True, ""), + (3, (50,), False, "nested/source"), + ], + indirect=True, +) +def test_copy_to( + copy_to_test_data: tuple[ + Group, np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[Any]], int, bool + ], +) -> None: + src, arr_data, subgroup_arr_data, zarr_format, consolidate_metadata = copy_to_test_data + + dst_store = MemoryStore() + + dst = src.copy_to(dst_store, overwrite=True) + + assert dst.attrs.get("root") is True + + subgroup = dst["subgroup"] + assert isinstance(subgroup, Group) + assert subgroup.attrs.get("subgroup") is True + + copied_arr = dst["dataset"] + copied_data = copied_arr[:] + assert np.array_equal(copied_data, arr_data) + + copied_subgroup_arr = subgroup["subgroup_dataset"] + copied_subgroup_data = copied_subgroup_arr[:] + assert np.array_equal(copied_subgroup_data, subgroup_arr_data) + + check_consolidated_metadata(dst_store, consolidate_metadata, zarr_format) + + +@pytest.mark.parametrize( + "copy_to_test_data", + [ + (2, None, False, ""), + (2, None, True, ""), + (3, (50,), False, ""), + (3, (50,), True, ""), + (3, (50,), False, "nested/source"), + ], + indirect=True, +) +def test_copy_to_with_path( + copy_to_test_data: tuple[ + Group, np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[Any]], int, bool + ], +) -> None: + src, arr_data, subgroup_arr_data, zarr_format, consolidate_metadata = copy_to_test_data + + dst_store = MemoryStore() + dst = src.copy_to(dst_store, path="sub/snapshot", overwrite=True) + + assert dst.attrs.get("root") is True + assert dst.store_path.path == "sub/snapshot" + + subgroup = dst["subgroup"] + assert isinstance(subgroup, Group) + assert subgroup.attrs.get("subgroup") is True + + copied_arr = dst["dataset"] + copied_data = copied_arr[:] + assert np.array_equal(copied_data, arr_data) + + copied_subgroup_arr = subgroup["subgroup_dataset"] + copied_subgroup_data = copied_subgroup_arr[:] + assert np.array_equal(copied_subgroup_data, subgroup_arr_data) + + check_consolidated_metadata(dst_store, consolidate_metadata, zarr_format, path="sub/snapshot") + + +def test_copy_to_overwrite_false_raises(zarr_format: ZarrFormat) -> None: + """copy_to with overwrite=False raises when a node already exists at the destination.""" + src = zarr.open_group(MemoryStore(), mode="w", zarr_format=zarr_format) + dst_store = MemoryStore() + zarr.open_group(dst_store, mode="w", zarr_format=zarr_format) + + with pytest.raises((ContainsGroupError, ContainsArrayError)): + src.copy_to(dst_store, overwrite=False) + + +def test_copy_to_skips_foreign_keys(zarr_format: ZarrFormat) -> None: + """copy_to copies only the data keys an array's metadata defines, not foreign keys under its prefix.""" + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + arr = src.create_array("dataset", shape=(10,), chunks=(5,), dtype="int64") + arr[:] = np.arange(10) + + # Plant a key under the array prefix that is not a defined chunk or metadata key. + foreign_key = "dataset/not_a_chunk_or_metadata_key" + sync(src_store.set(foreign_key, default_buffer_prototype().buffer.from_bytes(b"junk"))) + + dst_store = MemoryStore() + src.copy_to(dst_store, overwrite=True) + + dst_keys = sync(_collect_aiterator(dst_store.list())) + assert foreign_key not in dst_keys + # The array data itself was copied. + assert np.array_equal(zarr.open_group(dst_store)["dataset"][:], np.arange(10)) + + +def test_copy_to_skips_unwritten_chunks(zarr_format: ZarrFormat) -> None: + """copy_to reproduces exactly the source keys, materializing no unwritten chunks (sparse arrays).""" + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + # 10 chunks, but write only the first, leaving 9 chunks absent from the store. + arr = src.create_array("dataset", shape=(100,), chunks=(10,), dtype="int64") + arr[0:10] = np.arange(10) + + src_keys = set(sync(_collect_aiterator(src_store.list()))) + # Guard: the source really is sparse, so the test can detect spuriously created chunks. + assert sum(1 for k in src_keys if "dataset" in k and "0" in k.rsplit("/", 1)[-1]) == 1 + + dst_store = MemoryStore() + src.copy_to(dst_store, overwrite=True) + + # A faithful copy produces exactly the same keys: no unwritten chunks were created. + assert set(sync(_collect_aiterator(dst_store.list()))) == src_keys + + def test_group(store: Store, zarr_format: ZarrFormat) -> None: """ Test basic Group routines. From a46257aa83ae62c8854b017bf8567d97901bbb89 Mon Sep 17 00:00:00 2001 From: d-v-b-agent Date: Tue, 23 Jun 2026 13:19:13 +0000 Subject: [PATCH 2/4] perf(group): copy_to transfers objects concurrently; drop unused consolidated_metadata plumbing copy_to previously copied every metadata document and chunk/shard one at a time with sequential awaits, so on high-latency stores the wall-clock time was dominated by per-object round trips -- defeating the point of the raw byte-copy. Enumerate the full (source, destination) key set first (listing is cheap relative to moving payloads), then transfer concurrently via concurrent_map bounded by the async.concurrency config value. Per-array data key listing is likewise parallelized, and overwrite now uses store.delete_dir (which stores may implement more efficiently than a serial list+delete). Also remove the consolidated_metadata keyword added to Group/AsyncGroup from_store and create_group: copy_to copies the consolidated metadata bytes directly and never uses these parameters, they were untested, and the maintainers agreed in the PR thread to handle from_store consolidated metadata support in a separate PR. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/zarr/core/group.py | 77 ++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index 4c949f0188..c2e48834d2 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -46,6 +46,7 @@ NodeType, ShapeLike, ZarrFormat, + concurrent_map, parse_shapelike, ) from zarr.core.config import config @@ -466,7 +467,6 @@ async def from_store( store: StoreLike, *, attributes: dict[str, Any] | None = None, - consolidated_metadata: ConsolidatedMetadata | None = None, overwrite: bool = False, zarr_format: ZarrFormat = 3, ) -> AsyncGroup: @@ -481,11 +481,7 @@ async def from_store( await ensure_no_existing_node(store_path, zarr_format=zarr_format) attributes = attributes or {} group = cls( - metadata=GroupMetadata( - attributes=attributes, - consolidated_metadata=consolidated_metadata, - zarr_format=zarr_format, - ), + metadata=GroupMetadata(attributes=attributes, zarr_format=zarr_format), store_path=store_path, ) await group._save_metadata(ensure_parents=True) @@ -708,7 +704,8 @@ async def copy_to( Copy this group and all its contents to a new store. This performs a raw byte-level copy of all data, without decoding or - re-encoding array contents. + re-encoding array contents. Objects are transferred concurrently, governed by the + ``async.concurrency`` configuration value. Parameters ---------- @@ -731,10 +728,7 @@ async def copy_to( target_store_path = await make_store_path(store, path=path or "") if overwrite: - dst_store = target_store_path.store - dst_prefix = target_store_path.path + "/" if target_store_path.path else "" - async for key in dst_store.list_prefix(dst_prefix): - await dst_store.delete(key) + await target_store_path.store.delete_dir(target_store_path.path) else: await ensure_no_existing_node(target_store_path, zarr_format=self.metadata.zarr_format) @@ -744,32 +738,47 @@ async def copy_to( dst_prefix = target_store_path.path + "/" if target_store_path.path else "" prototype = default_buffer_prototype() + concurrency = config.get("async.concurrency") - async def _copy_key(src_key: str, dst_key: str) -> None: - """Copy a single key from source to destination store.""" - data = await src_store.get(src_key, prototype=prototype) - if data is not None: - await dst_store.set(dst_key, data) + # Enumerate every (source, destination) key pair before copying. Discovering the keys + # (listing the source store) is cheap compared with transferring chunk/shard bytes, so we + # build the full work list first and then copy the payloads concurrently below. + copy_pairs: list[tuple[str, str]] = [] - async def _copy_node(node_prefix: str, relative_keys: Iterable[str]) -> None: - """Copy a node's keys, which are relative to ``node_prefix`` within each node's path.""" + def _add_node_keys(node_prefix: str, relative_keys: Iterable[str]) -> None: + """Register a node's keys, which are relative to ``node_prefix`` within each node's path.""" for relative_key in relative_keys: node_key = f"{node_prefix}{relative_key}" if node_prefix else relative_key - await _copy_key(f"{src_prefix}{node_key}", f"{dst_prefix}{node_key}") + copy_pairs.append((f"{src_prefix}{node_key}", f"{dst_prefix}{node_key}")) - # Copy the root group's metadata keys. - await _copy_node("", _iter_metadata_keys(self.metadata)) + # The root group's metadata keys. + _add_node_keys("", _iter_metadata_keys(self.metadata)) - # Copy all children discovered via members(). + # All children discovered via members(). Collect arrays separately so their stored data + # keys can be listed concurrently rather than one array at a time. + array_members: list[tuple[str, AsyncArray[Any]]] = [] async for child_path, member in self.members( max_depth=None, use_consolidated_for_children=use_consolidated_for_children ): - node_prefix = f"{child_path}/" - await _copy_node(node_prefix, _iter_metadata_keys(member.metadata)) + _add_node_keys(f"{child_path}/", _iter_metadata_keys(member.metadata)) if isinstance(member, AsyncArray): - # Copy only the data keys that the array's metadata defines and that exist in the - # store. This skips both foreign keys under the array prefix and unwritten chunks. - await _copy_node(node_prefix, await _shards_initialized(member)) + array_members.append((child_path, member)) + + # Copy only the data keys that each array's metadata defines and that exist in the store. + # This skips both foreign keys under the array prefix and unwritten (sparse) chunks. + initialized_keys = await concurrent_map( + [(member,) for _, member in array_members], _shards_initialized, concurrency + ) + for (child_path, _), data_keys in zip(array_members, initialized_keys, strict=True): + _add_node_keys(f"{child_path}/", data_keys) + + async def _copy_key(src_key: str, dst_key: str) -> None: + """Copy a single key from source to destination store.""" + data = await src_store.get(src_key, prototype=prototype) + if data is not None: + await dst_store.set(dst_key, data) + + await concurrent_map(copy_pairs, _copy_key, concurrency) return await type(self).open(target_store_path, zarr_format=self.metadata.zarr_format) @@ -1021,7 +1030,6 @@ async def create_group( *, overwrite: bool = False, attributes: dict[str, Any] | None = None, - consolidated_metadata: ConsolidatedMetadata | None = None, ) -> AsyncGroup: """Create a sub-group. @@ -1033,9 +1041,6 @@ async def create_group( If True, do not raise an error if the group already exists. attributes : dict, optional Group attributes. - consolidated_metadata : ConsolidatedMetadata, optional - Consolidated Zarr metadata mapping that represents the entire hierarchy's - group and array metadata collected into a single dictionary. Returns ------- @@ -1045,7 +1050,6 @@ async def create_group( return await type(self).from_store( self.store_path / name, attributes=attributes, - consolidated_metadata=consolidated_metadata, overwrite=overwrite, zarr_format=self.metadata.zarr_format, ) @@ -1830,7 +1834,6 @@ def from_store( store: StoreLike, *, attributes: dict[str, Any] | None = None, - consolidated_metadata: ConsolidatedMetadata | None = None, zarr_format: ZarrFormat = 3, overwrite: bool = False, ) -> Group: @@ -1844,8 +1847,6 @@ def from_store( for a description of all valid StoreLike values. attributes : dict, optional A dictionary of JSON-serializable values with user-defined attributes. - consolidated_metadata : ConsolidatedMetadata, optional - Consolidated Metadata for this Group. This should contain metadata of child nodes below this group. zarr_format : {2, 3}, optional Zarr storage format version. overwrite : bool, optional @@ -1865,7 +1866,6 @@ def from_store( AsyncGroup.from_store( store, attributes=attributes, - consolidated_metadata=consolidated_metadata, overwrite=overwrite, zarr_format=zarr_format, ), @@ -2465,7 +2465,6 @@ def create_group( name: str, overwrite: bool = False, attributes: dict[str, Any] | None = None, - consolidated_metadata: ConsolidatedMetadata | None = None, ) -> Group: """Create a sub-group. @@ -2477,9 +2476,6 @@ def create_group( If True, do not raise an error if the group already exists. attributes : dict, optional Group attributes. - consolidated_metadata : ConsolidatedMetadata, optional - Consolidated Zarr metadata mapping that represents the entire hierarchy's - group and array metadata collected into a single dictionary. Returns ------- @@ -2499,7 +2495,6 @@ def create_group( name, overwrite=overwrite, attributes=attributes, - consolidated_metadata=consolidated_metadata, ) ) ) From f778bc900d0f78a72a85e3f22f9192c66bcc2b89 Mon Sep 17 00:00:00 2001 From: d-v-b-agent Date: Tue, 23 Jun 2026 14:03:49 +0000 Subject: [PATCH 3/4] feat(copy_to): node-level copy_to (Array + Group), write-order config, bounded streaming Applies lessons from studying tensorstore and zarrs to zarr-python's copy_to: - Factor copy_to to the node level. copy_to is fundamentally a node operation, so add Array.copy_to / AsyncArray.copy_to (copy one array's metadata + stored chunks/shards) and rebuild Group.copy_to on the same shared primitives (_node_metadata_pairs / _array_data_pairs / _copy_keys / _execute_copy in array.py). Byte-level copy with no decode/re-encode -- matching zarrs' fast path and tensorstore's (shipped-disabled) kvstore copy; their array Copy decodes only to convert dtype/codec, which is not what a faithful clone wants. - Add a copy.write_order config point (default "data_first") plus a per-call write_order override. "data_first" copies all chunk data, then metadata, with the root/consolidated metadata written dead last, so an interrupted copy never exposes an index that points at data which is not yet present; "metadata_first" makes the hierarchy browsable mid-copy. Neither tensorstore nor zarrs offers this for copy (zarrs is metadata-first with no commit; tensorstore relies on transactions), but the hazard is real and the safe default is cheap. - Stream the copy with a bounded runner (_copy_keys) capped at async.concurrency, fed by a generator that materializes one array's data keys at a time. This removes the whole-hierarchy key materialization (and the per-key task explosion of concurrent_map), bounding memory when copying hierarchies with millions of objects. - Reuse async.concurrency for the I/O bound. Our copy does no codec work, so unlike tensorstore (which splits data_copy_concurrency from file_io_concurrency) only the I/O concurrency knob is relevant; no codec-concurrency knob is added. Tests cover Array.copy_to (faithful copy, path, overwrite, foreign-key/sparse skip), deterministic write-order (recording store: data-before-metadata, root last, and the reverse), config-driven default, invalid value, and the bounded runner (in-flight cap + exception propagation). Co-Authored-By: Claude Opus 4.8 (1M context) --- changes/3612.feature.md | 11 +- docs/user-guide/groups.md | 17 ++- src/zarr/core/array.py | 225 +++++++++++++++++++++++++++++++++++++- src/zarr/core/config.py | 1 + src/zarr/core/group.py | 101 +++++++++-------- tests/test_array.py | 111 ++++++++++++++++++- tests/test_config.py | 1 + tests/test_group.py | 74 +++++++++++++ 8 files changed, 488 insertions(+), 53 deletions(-) diff --git a/changes/3612.feature.md b/changes/3612.feature.md index 821159d418..1bc7a7c282 100644 --- a/changes/3612.feature.md +++ b/changes/3612.feature.md @@ -1,3 +1,8 @@ -Added the convenience method for `zarr.Group` to copy to a destination store which -can be of a different type than the original store of the `zarr.Group` to be -copied. This will also copy over the metadata as is. \ No newline at end of file +Added ``copy_to`` convenience methods to ``zarr.Group`` and ``zarr.Array`` (and their async +counterparts) for copying a node, or a whole group hierarchy, to a destination store that may be +of a different type than the source. The copy is a raw byte-level transfer of the metadata +documents and stored chunks/shards (chunk bytes are not decoded and re-encoded), and reproduces +the source keys exactly, including consolidated metadata. The new ``write_order`` argument (and +the ``copy.write_order`` configuration value) controls whether data or metadata is written first, +so an interrupted copy can be made to never expose an index that points at data which is not yet +present. diff --git a/docs/user-guide/groups.md b/docs/user-guide/groups.md index 277a921863..9d372840cd 100644 --- a/docs/user-guide/groups.md +++ b/docs/user-guide/groups.md @@ -134,10 +134,23 @@ print(root.tree()) !!! note [`zarr.Group.tree`][] requires the optional [rich](https://rich.readthedocs.io/en/stable/) dependency. It can be installed with the `[tree]` extra. -You can copy a Group including consolidated metadata to a new destination store -(type of store can differ from the source store) using the `copy_to` method: +You can copy a Group (including consolidated metadata) to a new destination store +(the type of store can differ from the source store) using the `copy_to` method: ```python exec="true" session="groups" source="above" result="ansi" destination_store = zarr.storage.MemoryStore() new_group = root.copy_to(destination_store, overwrite=True) ``` + +This is a raw byte-level copy of the metadata documents and stored chunks/shards: +the chunk bytes are transferred as-is, without being decoded and re-encoded. +[`zarr.Array.copy_to`][] is available too, for copying a single array. + +The `write_order` argument controls what a partially completed (e.g. interrupted) +copy looks like to a reader. With the default `"data_first"`, all chunk data is +written before the metadata, and the root group's metadata (the hierarchy's index) +is written last, so an interrupted copy never exposes an index that points at data +which is not yet present. With `"metadata_first"`, the metadata is written first, so +the hierarchy is browsable while the copy is in progress (chunks that have not been +copied yet read as the array's fill value). The default can be set globally with the +`copy.write_order` [runtime configuration](config.md) value. diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index ebc9b38d94..72651d8be1 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -2,8 +2,8 @@ import math import warnings -from asyncio import gather -from collections.abc import Iterable, Mapping, Sequence +from asyncio import FIRST_COMPLETED, Task, ensure_future, gather, wait +from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Mapping, Sequence from dataclasses import dataclass, field, replace from itertools import starmap from logging import getLogger @@ -1796,6 +1796,68 @@ def _info( _count_chunks_initialized=count_chunks_initialized, ) + async def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + write_order: Literal["data_first", "metadata_first"] | None = None, + ) -> AnyAsyncArray: + """ + Copy this array to a new store. + + This performs a raw byte-level copy of the array's metadata and stored chunks/shards, + without decoding or re-encoding the data. Only the data keys defined by the array's + metadata that actually exist in the store are copied, so foreign keys under the array's + prefix and unwritten (sparse) chunks are skipped. Objects are transferred concurrently, + governed by the ``async.concurrency`` configuration value. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Array path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination, controlling what a partially + completed (e.g. interrupted) copy looks like to a reader. ``"data_first"`` writes the + chunk/shard data before the array metadata, so the array only becomes openable once its + data is present. ``"metadata_first"`` writes the metadata before the data, so the array + is openable while the copy is in progress (missing chunks read as ``fill_value``). If + None (default), the ``copy.write_order`` configuration value is used. + + Returns + ------- + AsyncArray + The new array in the target store. + """ + resolved_order = _resolve_write_order(write_order) + target_store_path = await make_store_path(store, path=path or "") + + if overwrite: + await target_store_path.store.delete_dir(target_store_path.path) + else: + await ensure_no_existing_node(target_store_path, zarr_format=self.metadata.zarr_format) + + src_prefix = self.store_path.path + "/" if self.store_path.path else "" + dst_prefix = target_store_path.path + "/" if target_store_path.path else "" + copy_one = _make_key_copier( + self.store_path.store, target_store_path.store, default_buffer_prototype() + ) + + await _execute_copy( + write_order=resolved_order, + copy_one=copy_one, + limit=zarr_config.get("async.concurrency"), + data_pairs=lambda: _array_data_pairs(self, src_prefix, dst_prefix), + child_metadata_pairs=[], + root_metadata_pairs=list(_node_metadata_pairs(self.metadata, src_prefix, dst_prefix)), + ) + return await type(self).open(target_store_path, zarr_format=self.metadata.zarr_format) + # TODO: Array can be a frozen data class again once property setters (e.g. shape) are removed @dataclass(frozen=False) @@ -1944,6 +2006,50 @@ def open( async_array = sync(AsyncArray.open(store)) return cls(async_array) + def copy_to( + self, + store: StoreLike, + *, + path: str | None = None, + overwrite: bool = False, + write_order: Literal["data_first", "metadata_first"] | None = None, + ) -> Array[T_ArrayMetadata]: + """ + Copy this array to a new store. + + This performs a raw byte-level copy of the array's metadata and stored chunks/shards, + without decoding or re-encoding the data. Only the data keys defined by the array's + metadata that actually exist in the store are copied, so foreign keys under the array's + prefix and unwritten (sparse) chunks are skipped. + + Parameters + ---------- + store : StoreLike + The store to copy to. + path : str, optional + Array path within the destination store. + overwrite : bool, optional + If True, overwrite any existing data in the target store. Default is False. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination. ``"data_first"`` (the default + behaviour when None, via the ``copy.write_order`` config value) writes chunk/shard data + before the array metadata, so the array only becomes openable once its data is present. + ``"metadata_first"`` writes the metadata first, so the array is openable mid-copy with + missing chunks reading as ``fill_value``. + + Returns + ------- + Array + The new array in the target store. + """ + return type(self)( + sync( + self._async_array.copy_to( + store=store, path=path, overwrite=overwrite, write_order=write_order + ) + ) + ) + @property def store(self) -> Store: return self.async_array.store @@ -5231,6 +5337,121 @@ def _iter_metadata_keys( yield ZATTRS_JSON +def _resolve_write_order( + write_order: Literal["data_first", "metadata_first"] | None, +) -> Literal["data_first", "metadata_first"]: + """Resolve the copy write-order, falling back to the ``copy.write_order`` config value.""" + order = write_order if write_order is not None else zarr_config.get("copy.write_order") + if order not in ("data_first", "metadata_first"): + raise ValueError( + f"Invalid write_order {order!r}; expected 'data_first' or 'metadata_first'." + ) + return cast("Literal['data_first', 'metadata_first']", order) + + +def _make_key_copier( + src_store: Store, dst_store: Store, prototype: BufferPrototype +) -> Callable[[str, str], Awaitable[None]]: + """Build a coroutine that copies the bytes at one key from ``src_store`` to ``dst_store``.""" + + async def copy_one(src_key: str, dst_key: str) -> None: + data = await src_store.get(src_key, prototype=prototype) + if data is not None: + await dst_store.set(dst_key, data) + + return copy_one + + +def _node_metadata_pairs( + metadata: ArrayV2Metadata | ArrayV3Metadata | GroupMetadata, + src_node_prefix: str, + dst_node_prefix: str, +) -> Iterator[tuple[str, str]]: + """Yield ``(source_key, destination_key)`` pairs for a node's metadata documents.""" + for relative_key in _iter_metadata_keys(metadata): + yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") + + +async def _array_data_pairs( + array: AnyAsyncArray, src_node_prefix: str, dst_node_prefix: str +) -> AsyncIterator[tuple[str, str]]: + """ + Yield ``(source_key, destination_key)`` pairs for an array's stored chunks/shards. + + Only the data keys the array's metadata defines that actually exist in the store are produced + (foreign keys and unwritten chunks are skipped). The keys for a single array are materialized + one array at a time, so iterating many arrays through this never holds the whole hierarchy. + """ + for relative_key in await _shards_initialized(array): + yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") + + +async def _aiter_pairs(pairs: Iterable[tuple[str, str]]) -> AsyncIterator[tuple[str, str]]: + """Adapt a synchronous iterable of key pairs to an async iterator.""" + for pair in pairs: + yield pair + + +async def _copy_keys( + pairs: AsyncIterator[tuple[str, str]], + copy_one: Callable[[str, str], Awaitable[None]], + limit: int, +) -> None: + """ + Copy ``(source_key, destination_key)`` pairs with at most ``limit`` transfers in flight. + + Pairs are consumed lazily and at most ``limit`` copy tasks exist at any moment, so neither the + full key set nor one task per key is ever resident. This bounds memory when copying hierarchies + with very large numbers of objects, unlike materializing every key up front. + """ + pending: set[Task[None]] = set() + try: + async for src_key, dst_key in pairs: + if len(pending) >= limit: + done, pending = await wait(pending, return_when=FIRST_COMPLETED) + for task in done: + task.result() # propagate any exception + pending.add(ensure_future(copy_one(src_key, dst_key))) + while pending: + done, pending = await wait(pending, return_when=FIRST_COMPLETED) + for task in done: + task.result() + finally: + for task in pending: + task.cancel() + if pending: + await gather(*pending, return_exceptions=True) + + +async def _execute_copy( + *, + write_order: Literal["data_first", "metadata_first"], + copy_one: Callable[[str, str], Awaitable[None]], + limit: int, + data_pairs: Callable[[], AsyncIterator[tuple[str, str]]], + child_metadata_pairs: list[tuple[str, str]], + root_metadata_pairs: list[tuple[str, str]], +) -> None: + """ + Run a node/hierarchy copy in two ordered phases. + + For ``"data_first"`` all chunk/shard data is copied, then descendant metadata, then the root + node's metadata last of all -- so an interrupted copy never exposes an index (group metadata, + including consolidated metadata) that points at data which is not yet present. For + ``"metadata_first"`` the metadata is written before the data. + """ + if write_order == "data_first": + await _copy_keys(data_pairs(), copy_one, limit) + await _copy_keys(_aiter_pairs(child_metadata_pairs), copy_one, limit) + # The root metadata is the hierarchy's index; write it dead last. + await _copy_keys(_aiter_pairs(root_metadata_pairs), copy_one, limit) + else: + await _copy_keys( + _aiter_pairs([*child_metadata_pairs, *root_metadata_pairs]), copy_one, limit + ) + await _copy_keys(data_pairs(), copy_one, limit) + + def _iter_shard_regions( array: AnyArray | AnyAsyncArray, *, diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index 08d2a50ace..4691057ff9 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -103,6 +103,7 @@ def enable_gpu(self) -> ConfigSet: "sharding_coalesce_max_bytes": 16 << 20, # 16 MiB }, "async": {"concurrency": 10, "timeout": None}, + "copy": {"write_order": "data_first"}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index c2e48834d2..0e4a7794f9 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -27,9 +27,12 @@ FiltersLike, SerializerLike, ShardsLike, - _iter_metadata_keys, + _array_data_pairs, + _execute_copy, + _make_key_copier, + _node_metadata_pairs, _parse_deprecated_compressor, - _shards_initialized, + _resolve_write_order, create_array, ) from zarr.core.attributes import Attributes @@ -46,7 +49,6 @@ NodeType, ShapeLike, ZarrFormat, - concurrent_map, parse_shapelike, ) from zarr.core.config import config @@ -699,13 +701,16 @@ async def copy_to( path: str | None = None, overwrite: bool = False, use_consolidated_for_children: bool = True, + write_order: Literal["data_first", "metadata_first"] | None = None, ) -> AsyncGroup: """ Copy this group and all its contents to a new store. - This performs a raw byte-level copy of all data, without decoding or - re-encoding array contents. Objects are transferred concurrently, governed by the - ``async.concurrency`` configuration value. + This performs a raw byte-level copy of all data, without decoding or re-encoding array + contents. Only the data keys an array's metadata defines that exist in the store are + copied, so foreign keys and unwritten (sparse) chunks are skipped. Objects are transferred + concurrently (governed by the ``async.concurrency`` config value) and streamed, so the full + set of keys is never held in memory at once. Parameters ---------- @@ -719,12 +724,21 @@ async def copy_to( Whether to use the consolidated metadata of child groups when iterating over the store contents. Note that this only affects groups loaded from the store. If the current Group already has consolidated metadata, it will always be used. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination, controlling what a partially + completed (e.g. interrupted) copy looks like to a reader. ``"data_first"`` copies all + chunk/shard data first, then metadata, with the root group metadata (the hierarchy's + index, including any consolidated metadata) written dead last -- so an interrupted copy + never exposes an index that points at data which is not yet present. ``"metadata_first"`` + writes metadata before data, so the hierarchy is browsable mid-copy with missing chunks + reading as ``fill_value``. If None (default), the ``copy.write_order`` config value is used. Returns ------- AsyncGroup The new group in the target store. """ + resolved_order = _resolve_write_order(write_order) target_store_path = await make_store_path(store, path=path or "") if overwrite: @@ -732,53 +746,42 @@ async def copy_to( else: await ensure_no_existing_node(target_store_path, zarr_format=self.metadata.zarr_format) - src_store = self.store_path.store src_prefix = self.store_path.path + "/" if self.store_path.path else "" - dst_store = target_store_path.store dst_prefix = target_store_path.path + "/" if target_store_path.path else "" + copy_one = _make_key_copier( + self.store_path.store, target_store_path.store, default_buffer_prototype() + ) - prototype = default_buffer_prototype() - concurrency = config.get("async.concurrency") - - # Enumerate every (source, destination) key pair before copying. Discovering the keys - # (listing the source store) is cheap compared with transferring chunk/shard bytes, so we - # build the full work list first and then copy the payloads concurrently below. - copy_pairs: list[tuple[str, str]] = [] - - def _add_node_keys(node_prefix: str, relative_keys: Iterable[str]) -> None: - """Register a node's keys, which are relative to ``node_prefix`` within each node's path.""" - for relative_key in relative_keys: - node_key = f"{node_prefix}{relative_key}" if node_prefix else relative_key - copy_pairs.append((f"{src_prefix}{node_key}", f"{dst_prefix}{node_key}")) - - # The root group's metadata keys. - _add_node_keys("", _iter_metadata_keys(self.metadata)) - - # All children discovered via members(). Collect arrays separately so their stored data - # keys can be listed concurrently rather than one array at a time. - array_members: list[tuple[str, AsyncArray[Any]]] = [] + # Single traversal of the hierarchy. We collect only the metadata key pairs (one small set + # per node) and references to the array nodes -- never the data keys -- so memory stays + # bounded by the number of nodes, not the number of chunks. + root_metadata_pairs = list(_node_metadata_pairs(self.metadata, src_prefix, dst_prefix)) + child_metadata_pairs: list[tuple[str, str]] = [] + array_nodes: list[tuple[AnyAsyncArray, str, str]] = [] async for child_path, member in self.members( max_depth=None, use_consolidated_for_children=use_consolidated_for_children ): - _add_node_keys(f"{child_path}/", _iter_metadata_keys(member.metadata)) + node_src = f"{src_prefix}{child_path}/" + node_dst = f"{dst_prefix}{child_path}/" + child_metadata_pairs.extend(_node_metadata_pairs(member.metadata, node_src, node_dst)) if isinstance(member, AsyncArray): - array_members.append((child_path, member)) - - # Copy only the data keys that each array's metadata defines and that exist in the store. - # This skips both foreign keys under the array prefix and unwritten (sparse) chunks. - initialized_keys = await concurrent_map( - [(member,) for _, member in array_members], _shards_initialized, concurrency + array_nodes.append((member, node_src, node_dst)) + + async def data_pairs() -> AsyncIterator[tuple[str, str]]: + # Each array's data keys are materialized one array at a time, so iterating the whole + # hierarchy's chunks never holds more than a single array's key set. + for array, node_src, node_dst in array_nodes: + async for pair in _array_data_pairs(array, node_src, node_dst): + yield pair + + await _execute_copy( + write_order=resolved_order, + copy_one=copy_one, + limit=config.get("async.concurrency"), + data_pairs=data_pairs, + child_metadata_pairs=child_metadata_pairs, + root_metadata_pairs=root_metadata_pairs, ) - for (child_path, _), data_keys in zip(array_members, initialized_keys, strict=True): - _add_node_keys(f"{child_path}/", data_keys) - - async def _copy_key(src_key: str, dst_key: str) -> None: - """Copy a single key from source to destination store.""" - data = await src_store.get(src_key, prototype=prototype) - if data is not None: - await dst_store.set(dst_key, data) - - await concurrent_map(copy_pairs, _copy_key, concurrency) return await type(self).open(target_store_path, zarr_format=self.metadata.zarr_format) @@ -1905,6 +1908,7 @@ def copy_to( path: str | None = None, overwrite: bool = False, use_consolidated_for_children: bool = True, + write_order: Literal["data_first", "metadata_first"] | None = None, ) -> Group: """ Copy this group and all its contents to a new store. @@ -1921,6 +1925,12 @@ def copy_to( Whether to use the consolidated metadata of child groups when iterating over the store contents. Note that this only affects groups loaded from the store. If the current Group already has consolidated metadata, it will always be used. + write_order : {"data_first", "metadata_first"}, optional + The order in which to write objects to the destination. ``"data_first"`` (the default + behaviour when None, via the ``copy.write_order`` config value) copies all data first, + then metadata, with the root group metadata written dead last, so an interrupted copy + never exposes an index pointing at absent data. ``"metadata_first"`` writes metadata + first, so the hierarchy is browsable mid-copy with missing chunks reading as ``fill_value``. Returns ------- @@ -1934,6 +1944,7 @@ def copy_to( path=path, overwrite=overwrite, use_consolidated_for_children=use_consolidated_for_children, + write_order=write_order, ) ) ) diff --git a/tests/test_array.py b/tests/test_array.py index 0d6d2d5906..e7f5b115cf 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -72,7 +72,7 @@ from zarr.core.group import AsyncGroup from zarr.core.indexing import BasicIndexer, _iter_grid, _iter_regions from zarr.core.metadata.v2 import ArrayV2Metadata -from zarr.core.sync import sync +from zarr.core.sync import _collect_aiterator, sync from zarr.errors import ( ContainsArrayError, ContainsGroupError, @@ -2374,3 +2374,112 @@ async def test_create_array_chunks_3d( shape = (10, 12, 15) arr = await create_array(store={}, shape=shape, chunks=chunk_input, dtype="float64") assert arr.write_chunk_sizes == expected + + +def test_array_copy_to(zarr_format: ZarrFormat) -> None: + """Array.copy_to performs a faithful byte-level copy of a single array node.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(30,), chunks=(7,), dtype="int64") + a[:] = np.arange(30) + + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + + dst_store = MemoryStore() + copied = arr.copy_to(dst_store, overwrite=True) + assert isinstance(copied, zarr.Array) + assert np.array_equal(copied[:], np.arange(30)) + + +def test_array_copy_to_with_path(zarr_format: ZarrFormat) -> None: + """Array.copy_to honors a destination path.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(15,), chunks=(4,), dtype="int64") + a[:] = np.arange(15) + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + + dst_store = MemoryStore() + copied = arr.copy_to(dst_store, path="nested/clone", overwrite=True) + assert copied.store_path.path == "nested/clone" + assert np.array_equal(copied[:], np.arange(15)) + + +def test_array_copy_to_overwrite_false_raises(zarr_format: ZarrFormat) -> None: + """Array.copy_to with overwrite=False raises when the destination is occupied.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(10,), chunks=(5,), dtype="int64") + a[:] = np.arange(10) + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + + dst_store = MemoryStore() + # Put a group at the destination root so the target location is occupied. + zarr.open_group(dst_store, mode="w", zarr_format=zarr_format) + with pytest.raises((ContainsArrayError, ContainsGroupError)): + arr.copy_to(dst_store, overwrite=False) + + +def test_array_copy_to_skips_foreign_and_unwritten(zarr_format: ZarrFormat) -> None: + """Array.copy_to copies only defined, written data keys; foreign keys and gaps are skipped.""" + src_store = MemoryStore() + g = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + a = g.create_array("dataset", shape=(100,), chunks=(10,), dtype="int64") + a[0:10] = np.arange(10) # only the first chunk written -> sparse + # Plant a foreign key under the array prefix. + sync( + src_store.set("dataset/not_a_chunk", default_buffer_prototype().buffer.from_bytes(b"junk")) + ) + + arr = zarr.open_group(src_store)["dataset"] + assert isinstance(arr, zarr.Array) + dst_store = MemoryStore() + arr.copy_to(dst_store, overwrite=True) + + dst_keys = set(sync(_collect_aiterator(dst_store.list()))) + # The foreign key (which would relativize to "not_a_chunk") was not copied. + assert "not_a_chunk" not in dst_keys + # The one written chunk round-trips; the array is faithfully reproduced. + copied = zarr.open_array(dst_store) + assert np.array_equal(copied[0:10], np.arange(10)) + + +def test_copy_keys_bounds_in_flight_transfers() -> None: + """_copy_keys streams pairs and keeps at most ``limit`` transfers in flight.""" + import asyncio + + from zarr.core.array import _aiter_pairs, _copy_keys + + limit = 4 + state = {"in_flight": 0, "max_in_flight": 0} + + async def copy_one(src_key: str, dst_key: str) -> None: + state["in_flight"] += 1 + state["max_in_flight"] = max(state["max_in_flight"], state["in_flight"]) + await asyncio.sleep(0) + state["in_flight"] -= 1 + + pairs = [(f"s{i}", f"d{i}") for i in range(50)] + sync(_copy_keys(_aiter_pairs(pairs), copy_one, limit)) + + assert state["max_in_flight"] <= limit + assert state["max_in_flight"] > 1 # concurrency was actually exercised + + +def test_copy_keys_propagates_exceptions() -> None: + """_copy_keys surfaces an exception raised by an individual transfer.""" + import asyncio + + from zarr.core.array import _aiter_pairs, _copy_keys + + async def copy_one(src_key: str, dst_key: str) -> None: + await asyncio.sleep(0) + if src_key == "s3": + raise RuntimeError("boom") + + pairs = [(f"s{i}", f"d{i}") for i in range(10)] + with pytest.raises(RuntimeError, match="boom"): + sync(_copy_keys(_aiter_pairs(pairs), copy_one, 4)) diff --git a/tests/test_config.py b/tests/test_config.py index a758378dc7..344bcfb823 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -61,6 +61,7 @@ def test_config_defaults_set() -> None: "sharding_coalesce_max_bytes": 16 << 20, }, "async": {"concurrency": 10, "timeout": None}, + "copy": {"write_order": "data_first"}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/tests/test_group.py b/tests/test_group.py index 2defce6ff8..c99c0d81b4 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -540,6 +540,80 @@ def test_copy_to_skips_unwritten_chunks(zarr_format: ZarrFormat) -> None: assert set(sync(_collect_aiterator(dst_store.list()))) == src_keys +class _RecordingMemoryStore(MemoryStore): + """A MemoryStore that records the order in which keys are written via ``set``.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.set_order: list[str] = [] + + async def set(self, key: str, value: Any, byte_range: Any = None) -> None: + await super().set(key, value, byte_range) + self.set_order.append(key) + + +def _is_metadata_key(key: str) -> bool: + return key.rsplit("/", 1)[-1] in ("zarr.json", ".zarray", ".zgroup", ".zattrs", ".zmetadata") + + +def _make_copy_source(zarr_format: ZarrFormat) -> Group: + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + arr = src.create_array("dataset", shape=(20,), chunks=(5,), dtype="int64") + arr[:] = np.arange(20) + return src + + +def test_copy_to_data_first_writes_data_then_metadata(zarr_format: ZarrFormat) -> None: + """data_first: every data object is written before any metadata, and root metadata is last.""" + src = _make_copy_source(zarr_format) + dst_store = _RecordingMemoryStore() + src.copy_to(dst_store, overwrite=True, write_order="data_first") + + order = dst_store.set_order + data_idx = [i for i, k in enumerate(order) if not _is_metadata_key(k)] + meta_idx = [i for i, k in enumerate(order) if _is_metadata_key(k)] + assert data_idx + assert meta_idx + # All data precedes all metadata. + assert max(data_idx) < min(meta_idx) + # The root group's metadata document (the hierarchy index) is written dead last. + assert _is_metadata_key(order[-1]) + assert "/" not in order[-1] + + +def test_copy_to_metadata_first_writes_metadata_then_data(zarr_format: ZarrFormat) -> None: + """metadata_first: all metadata is written before any data object.""" + src = _make_copy_source(zarr_format) + dst_store = _RecordingMemoryStore() + src.copy_to(dst_store, overwrite=True, write_order="metadata_first") + + order = dst_store.set_order + data_idx = [i for i, k in enumerate(order) if not _is_metadata_key(k)] + meta_idx = [i for i, k in enumerate(order) if _is_metadata_key(k)] + assert data_idx + assert meta_idx + assert max(meta_idx) < min(data_idx) + + +def test_copy_to_default_write_order_follows_config(zarr_format: ZarrFormat) -> None: + """With no write_order argument, copy_to honors the copy.write_order config value.""" + src = _make_copy_source(zarr_format) + dst_store = _RecordingMemoryStore() + with zarr_config.set({"copy.write_order": "metadata_first"}): + src.copy_to(dst_store, overwrite=True) + order = dst_store.set_order + data_idx = [i for i, k in enumerate(order) if not _is_metadata_key(k)] + meta_idx = [i for i, k in enumerate(order) if _is_metadata_key(k)] + assert max(meta_idx) < min(data_idx) + + +def test_copy_to_invalid_write_order_raises(zarr_format: ZarrFormat) -> None: + src = zarr.open_group(MemoryStore(), mode="w", zarr_format=zarr_format) + with pytest.raises(ValueError, match="write_order"): + src.copy_to(MemoryStore(), overwrite=True, write_order="sideways") # type: ignore[arg-type] + + def test_group(store: Store, zarr_format: ZarrFormat) -> None: """ Test basic Group routines. From 8873da59b07f8e5c2eaf360fadab2db4bfd3c00a Mon Sep 17 00:00:00 2001 From: d-v-b-agent Date: Tue, 23 Jun 2026 16:31:50 +0000 Subject: [PATCH 4/4] feat(chunk-keys): is_chunk_key API; stream copy_to for arbitrarily large arrays copy_to previously enumerated each array's full theoretical chunk grid (via _shards_initialized) to decide which keys to copy. That is O(nchunks) in time and memory, so a sparse array with a huge grid -- e.g. one opened from another implementation -- could not be copied. Replace that with a streaming filter: - Add ChunkKeyEncoding.is_chunk_key(key, grid_shape) and supports_decode(), plus ArrayV2Metadata.is_chunk_key / ArrayV3Metadata.is_chunk_key, which recognize whether a store key is a valid, canonical, in-bounds chunk/shard key for an array in O(1) -- no grid enumeration. The stored-object grid is the metadata's own top-level chunk grid (a sharded array's grid already counts shards), exposed as ArrayV3Metadata._stored_object_grid_shape; v2 adds a chunk_key_encoding property for uniform access. - copy_to's _array_data_pairs now streams store.list_prefix and keeps only keys that pass metadata.is_chunk_key, so the chunk grid is never materialized and memory stays bounded regardless of array size. Custom encodings that cannot decode keys fall back to grid enumeration. - Fix DefaultChunkKeyEncoding.decode_chunk_key, which was a debug-only stub that raised on valid keys for the standard "/" separator (it stripped only the leading "c", not the following separator). It now round-trips for both separators, which is_chunk_key relies on for its canonical-form check. Tests cover the encoding (round-trip, canonical/bounds/foreign/scalar/sharded rejection) and an end-to-end copy of an array with a ~1e12 chunk grid that completes instantly because it streams instead of enumerating. Co-Authored-By: Claude Opus 4.8 (1M context) --- changes/3612.feature.md | 6 ++ src/zarr/core/array.py | 27 +++++++-- src/zarr/core/chunk_key_encodings.py | 60 ++++++++++++++++++- src/zarr/core/metadata/v2.py | 23 +++++++- src/zarr/core/metadata/v3.py | 32 ++++++++++ tests/test_chunk_key_encodings.py | 87 ++++++++++++++++++++++++++++ tests/test_group.py | 41 +++++++++++++ 7 files changed, 270 insertions(+), 6 deletions(-) create mode 100644 tests/test_chunk_key_encodings.py diff --git a/changes/3612.feature.md b/changes/3612.feature.md index 1bc7a7c282..734d6b7111 100644 --- a/changes/3612.feature.md +++ b/changes/3612.feature.md @@ -6,3 +6,9 @@ the source keys exactly, including consolidated metadata. The new ``write_order` the ``copy.write_order`` configuration value) controls whether data or metadata is written first, so an interrupted copy can be made to never expose an index that points at data which is not yet present. + +The copy streams the source store rather than materializing the chunk grid, so it works on arrays +with arbitrarily many chunks in bounded memory. This is supported by a new +``ChunkKeyEncoding.is_chunk_key`` method (and ``ArrayV2Metadata.is_chunk_key`` / +``ArrayV3Metadata.is_chunk_key``) that recognizes whether a store key is a valid chunk/shard key +for an array in O(1), without enumerating the grid. diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 72651d8be1..60be7e86be 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -5379,11 +5379,30 @@ async def _array_data_pairs( Yield ``(source_key, destination_key)`` pairs for an array's stored chunks/shards. Only the data keys the array's metadata defines that actually exist in the store are produced - (foreign keys and unwritten chunks are skipped). The keys for a single array are materialized - one array at a time, so iterating many arrays through this never holds the whole hierarchy. + (foreign keys and unwritten chunks are skipped). + + When the array's chunk key encoding can decode keys (the built-in encodings can), the source + store is *streamed*: keys are listed lazily and each is recognized in O(1) via + ``metadata.is_chunk_key``, so the chunk grid is never materialized. This keeps memory bounded + when copying arrays with arbitrarily many chunks. Custom encodings that cannot decode keys fall + back to enumerating the grid via [`_shards_initialized`][zarr.core.array._shards_initialized]. """ - for relative_key in await _shards_initialized(array): - yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") + metadata = array.metadata + if metadata.chunk_key_encoding.supports_decode(): + store = array.store_path.store + array_path = array.store_path.path + child_prefix = f"{array_path}/" if array_path else "" + async for stored_key in store.list_prefix(array_path): + # list_prefix matches on string prefix, so guard against sibling nodes whose path + # starts with this array's path (e.g. "foo" vs "foobar"). + if child_prefix and not stored_key.startswith(child_prefix): + continue + relative_key = stored_key[len(child_prefix) :] + if metadata.is_chunk_key(relative_key): + yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") + else: + for relative_key in await _shards_initialized(array): + yield (f"{src_node_prefix}{relative_key}", f"{dst_node_prefix}{relative_key}") async def _aiter_pairs(pairs: Iterable[tuple[str, str]]) -> AsyncIterator[tuple[str, str]]: diff --git a/src/zarr/core/chunk_key_encodings.py b/src/zarr/core/chunk_key_encodings.py index 098f2c8981..5fe3bd7f46 100644 --- a/src/zarr/core/chunk_key_encodings.py +++ b/src/zarr/core/chunk_key_encodings.py @@ -54,6 +54,59 @@ def decode_chunk_key(self, chunk_key: str) -> tuple[int, ...]: """ raise NotImplementedError(f"{self.__class__.__name__} does not implement decode_chunk_key.") + def supports_decode(self) -> bool: + """ + Whether this encoding can decode chunk keys back into coordinates. + + True when ``decode_chunk_key`` is overridden (as it is for the built-in ``default`` and + ``v2`` encodings). Callers that need to recognize chunk keys -- e.g. to validate or filter + store keys without enumerating the whole chunk grid -- can use this to decide whether + [`is_chunk_key`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.is_chunk_key] is usable. + """ + return type(self).decode_chunk_key is not ChunkKeyEncoding.decode_chunk_key + + def is_chunk_key(self, key: str, *, grid_shape: tuple[int, ...]) -> bool: + """ + Return whether ``key`` is a valid chunk key for a chunk grid of shape ``grid_shape``. + + A key is valid when it is a well-formed, *canonical* encoding of chunk coordinates of the + right dimensionality (``len(grid_shape)``) whose coordinates all fall within the grid + (``0 <= coord < grid_shape[dim]``). Canonical means the key round-trips: + ``encode_chunk_key(decode_chunk_key(key)) == key``, so non-canonical strings such as a + zero-padded ``"c/01"`` are rejected, as are metadata documents and foreign keys that happen + to share the array's prefix. + + This is O(1) in the number of stored chunks, so it can classify store keys discovered by + listing without materializing the chunk grid -- the key primitive for streaming operations + over arrays with very large numbers of chunks. Requires + [`supports_decode`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.supports_decode]; raises + ``NotImplementedError`` otherwise. + + Parameters + ---------- + key : str + The candidate chunk key, relative to the array's path. + grid_shape : tuple[int, ...] + The number of chunks along each dimension of the (top-level / stored-object) chunk + grid. For a sharded array this is the shard grid, since shards are the stored objects. + + Returns + ------- + bool + """ + ndim = len(grid_shape) + # A zero-dimensional array stores its single chunk under a special key ("c" for the + # default encoding, "0" for v2) that does not decode back to an empty coordinate tuple. + if ndim == 0: + return key == self.encode_chunk_key(()) + try: + coords = self.decode_chunk_key(key) + except (ValueError, IndexError): + return False + if len(coords) != ndim or self.encode_chunk_key(coords) != key: + return False + return all(0 <= coord < size for coord, size in zip(coords, grid_shape, strict=True)) + @abstractmethod def encode_chunk_key(self, chunk_coords: tuple[int, ...]) -> str: """ @@ -79,7 +132,12 @@ def __post_init__(self) -> None: def decode_chunk_key(self, chunk_key: str) -> tuple[int, ...]: if chunk_key == "c": return () - return tuple(map(int, chunk_key[1:].split(self.separator))) + # Keys are "c" followed by the separator and the separator-joined coordinates, + # e.g. "c/0/1" or "c.0.1". Strip the leading "c" before splitting. + prefix = "c" + self.separator + if not chunk_key.startswith(prefix): + raise ValueError(f"Invalid chunk key {chunk_key!r} for {self.name} encoding.") + return tuple(int(part) for part in chunk_key[len(prefix) :].split(self.separator)) def encode_chunk_key(self, chunk_coords: tuple[int, ...]) -> str: return self.separator.join(map(str, ("c",) + chunk_coords)) diff --git a/src/zarr/core/metadata/v2.py b/src/zarr/core/metadata/v2.py index ac32521239..1d496ba544 100644 --- a/src/zarr/core/metadata/v2.py +++ b/src/zarr/core/metadata/v2.py @@ -32,12 +32,13 @@ from zarr.core._json import json_to_buffer from zarr.core.array_spec import ArrayConfig, ArraySpec -from zarr.core.chunk_key_encodings import parse_separator +from zarr.core.chunk_key_encodings import ChunkKeyEncoding, V2ChunkKeyEncoding, parse_separator from zarr.core.common import ( JSON, ZARRAY_JSON, ZATTRS_JSON, MemoryOrder, + ceildiv, parse_shapelike, ) from zarr.core.config import config, parse_indexing_order @@ -116,6 +117,26 @@ def __init__( def ndim(self) -> int: return len(self.shape) + @property + def chunk_key_encoding(self) -> ChunkKeyEncoding: + """The chunk key encoding for this array (a v2 encoding using its dimension separator).""" + return V2ChunkKeyEncoding(separator=self.dimension_separator) + + def is_chunk_key(self, key: str) -> bool: + """ + Return whether ``key`` (relative to the array's path) is a valid storage key for one of + this array's stored chunks. + + A key is valid when it is a well-formed, canonical, in-bounds chunk key per this array's + chunk key encoding and grid (see + [`ChunkKeyEncoding.is_chunk_key`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.is_chunk_key]). + Metadata documents and foreign keys that share the array's prefix return False. This is + O(1) in the number of chunks, so store keys can be classified by listing rather than by + enumerating the chunk grid. + """ + grid_shape = tuple(ceildiv(s, c) for s, c in zip(self.shape, self.chunks, strict=True)) + return self.chunk_key_encoding.is_chunk_key(key, grid_shape=grid_shape) + @cached_property def chunk_grid(self) -> ChunkGrid: """Backwards-compatible chunk grid property. diff --git a/src/zarr/core/metadata/v3.py b/src/zarr/core/metadata/v3.py index 9eaccc5076..78709c9292 100644 --- a/src/zarr/core/metadata/v3.py +++ b/src/zarr/core/metadata/v3.py @@ -24,6 +24,7 @@ DimensionNamesLike, NamedConfig, NamedRequiredConfig, + ceildiv, compress_rle, expand_rle, parse_named_configuration, @@ -557,6 +558,37 @@ def _validate_metadata(self) -> None: def ndim(self) -> int: return len(self.shape) + @property + def _stored_object_grid_shape(self) -> tuple[int, ...]: + """ + The number of stored objects (chunks, or shards when sharding) along each dimension. + + This is the top-level chunk grid described by ``chunk_grid``: for a sharded array each + top-level chunk is stored as one shard, so this counts shards, matching the keys produced + by ``chunk_key_encoding.encode_chunk_key``. + """ + grid = self.chunk_grid + if isinstance(grid, RegularChunkGridMetadata): + return tuple(ceildiv(s, c) for s, c in zip(self.shape, grid.chunk_shape, strict=True)) + return tuple( + len(chunk_sizes) if isinstance(chunk_sizes, tuple) else ceildiv(extent, chunk_sizes) + for extent, chunk_sizes in zip(self.shape, grid.chunk_shapes, strict=True) + ) + + def is_chunk_key(self, key: str) -> bool: + """ + Return whether ``key`` (relative to the array's path) is a valid storage key for one of + this array's stored chunks/shards. + + A key is valid when it is a well-formed, canonical, in-bounds chunk key per this array's + chunk key encoding and grid (see + [`ChunkKeyEncoding.is_chunk_key`][zarr.core.chunk_key_encodings.ChunkKeyEncoding.is_chunk_key]). + Metadata documents and foreign keys that share the array's prefix return False. This is + O(1) in the number of chunks, so store keys can be classified by listing rather than by + enumerating the chunk grid. + """ + return self.chunk_key_encoding.is_chunk_key(key, grid_shape=self._stored_object_grid_shape) + @property def dtype(self) -> ZDType[TBaseDType, TBaseScalar]: return self.data_type diff --git a/tests/test_chunk_key_encodings.py b/tests/test_chunk_key_encodings.py new file mode 100644 index 0000000000..0f70fc6d2e --- /dev/null +++ b/tests/test_chunk_key_encodings.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import pytest + +import zarr +from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding +from zarr.storage import MemoryStore + + +@pytest.mark.parametrize("separator", [".", "/"]) +@pytest.mark.parametrize("coords", [(), (0,), (3,), (1, 2), (10, 0, 7)]) +def test_default_encoding_round_trips(separator: str, coords: tuple[int, ...]) -> None: + enc = DefaultChunkKeyEncoding(separator=separator) # type: ignore[arg-type] + key = enc.encode_chunk_key(coords) + assert enc.decode_chunk_key(key) == coords + + +@pytest.mark.parametrize("separator", [".", "/"]) +@pytest.mark.parametrize("coords", [(0,), (3,), (1, 2), (10, 0, 7)]) +def test_v2_encoding_round_trips(separator: str, coords: tuple[int, ...]) -> None: + enc = V2ChunkKeyEncoding(separator=separator) # type: ignore[arg-type] + key = enc.encode_chunk_key(coords) + assert enc.decode_chunk_key(key) == coords + + +@pytest.mark.parametrize( + "encoding", + [DefaultChunkKeyEncoding(separator="/"), V2ChunkKeyEncoding(separator=".")], +) +def test_supports_decode(encoding: DefaultChunkKeyEncoding | V2ChunkKeyEncoding) -> None: + assert encoding.supports_decode() is True + + +def test_default_decode_rejects_garbage() -> None: + enc = DefaultChunkKeyEncoding(separator="/") + for bad in ["zarr.json", "c/x", "0/0", "cc/0", "c/"]: + with pytest.raises(ValueError): + enc.decode_chunk_key(bad) + + +def test_encoding_is_chunk_key_grid_bounds() -> None: + enc = DefaultChunkKeyEncoding(separator="/") + grid = (4, 3) + assert enc.is_chunk_key("c/0/0", grid_shape=grid) + assert enc.is_chunk_key("c/3/2", grid_shape=grid) # last cell + assert not enc.is_chunk_key("c/4/0", grid_shape=grid) # row out of bounds + assert not enc.is_chunk_key("c/0/3", grid_shape=grid) # col out of bounds + assert not enc.is_chunk_key("c/0", grid_shape=grid) # wrong dimensionality + assert not enc.is_chunk_key("c/00/0", grid_shape=grid) # non-canonical (zero padded) + assert not enc.is_chunk_key("c/-1/0", grid_shape=grid) # negative + assert not enc.is_chunk_key("zarr.json", grid_shape=grid) # metadata document + assert not enc.is_chunk_key("foreign", grid_shape=grid) # foreign key + + +def test_encoding_is_chunk_key_scalar() -> None: + # A zero-dimensional array stores its single chunk under a special key. + assert DefaultChunkKeyEncoding(separator="/").is_chunk_key("c", grid_shape=()) + assert not DefaultChunkKeyEncoding(separator="/").is_chunk_key("c/0", grid_shape=()) + assert V2ChunkKeyEncoding(separator=".").is_chunk_key("0", grid_shape=()) + + +@pytest.mark.parametrize("zarr_format", [2, 3]) +def test_metadata_is_chunk_key(zarr_format: int) -> None: + g = zarr.open_group(MemoryStore(), mode="w", zarr_format=zarr_format) + arr = g.create_array("d", shape=(100, 80), chunks=(10, 20), dtype="int64") + m = arr.metadata + enc = m.chunk_key_encoding + + assert m.is_chunk_key(enc.encode_chunk_key((0, 0))) + assert m.is_chunk_key(enc.encode_chunk_key((9, 3))) # last chunk: ceil(100/10)-1, ceil(80/20)-1 + assert not m.is_chunk_key(enc.encode_chunk_key((10, 0))) # out of bounds + assert not m.is_chunk_key(enc.encode_chunk_key((0,))) # wrong dimensionality + assert not m.is_chunk_key("zarr.json") + assert not m.is_chunk_key(".zarray") + assert not m.is_chunk_key("not_a_chunk_key") + + +def test_metadata_is_chunk_key_sharded() -> None: + # For a sharded array the stored objects are shards, keyed over the shard grid. + g = zarr.open_group(MemoryStore(), mode="w", zarr_format=3) + arr = g.create_array("s", shape=(100,), chunks=(10,), shards=(50,), dtype="int64") + m = arr.metadata + assert m._stored_object_grid_shape == (2,) # ceil(100 / 50) shards + enc = m.chunk_key_encoding + assert m.is_chunk_key(enc.encode_chunk_key((0,))) + assert m.is_chunk_key(enc.encode_chunk_key((1,))) + assert not m.is_chunk_key(enc.encode_chunk_key((2,))) # only 2 shards exist diff --git a/tests/test_group.py b/tests/test_group.py index c99c0d81b4..ec68c02f5f 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -614,6 +614,47 @@ def test_copy_to_invalid_write_order_raises(zarr_format: ZarrFormat) -> None: src.copy_to(MemoryStore(), overwrite=True, write_order="sideways") # type: ignore[arg-type] +def test_copy_to_streams_array_with_huge_grid(zarr_format: ZarrFormat) -> None: + """ + copy_to streams stored keys rather than enumerating the chunk grid. + + We build a sparse array whose chunk grid is astronomically large (~1e12 chunks) by writing a + small array and then rewriting its stored metadata to claim a huge shape -- the situation that + arises when opening an array written by another implementation. ``create_array`` itself does + work proportional to the chunk count, so the array is constructed via metadata surgery. Any + copy that materialized the chunk grid would hang here; only a streaming copy (list stored keys + + O(1) recognition) can reproduce exactly the keys that exist. + """ + huge = 1_000_000_000_000 + src_store = MemoryStore() + src = zarr.open_group(src_store, mode="w", zarr_format=zarr_format) + arr = src.create_array("dataset", shape=(4,), chunks=(1,), dtype="int64") + arr[0] = 10 + arr[3] = 13 # chunk index 3 stays in-bounds after the shape grows + + # Rewrite the stored metadata to claim a huge shape, leaving the two chunks in place. + proto = default_buffer_prototype() + meta_key = "dataset/zarr.json" if zarr_format == 3 else "dataset/.zarray" + raw = sync(src_store.get(meta_key, prototype=proto)) + assert raw is not None + doc = json.loads(raw.to_bytes()) + doc["shape"] = [huge] + sync(src_store.set(meta_key, proto.buffer.from_bytes(json.dumps(doc).encode()))) + + reopened = zarr.open_group(src_store) + src_keys = set(sync(_collect_aiterator(src_store.list()))) + + dst_store = MemoryStore() + reopened.copy_to(dst_store, overwrite=True) + + # A faithful copy reproduces exactly the keys that exist in the source. + assert set(sync(_collect_aiterator(dst_store.list()))) == src_keys + copied = zarr.open_group(dst_store)["dataset"] + assert isinstance(copied, Array) + assert copied.shape == (huge,) + assert copied[3] == 13 + + def test_group(store: Store, zarr_format: ZarrFormat) -> None: """ Test basic Group routines.