Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 67 additions & 34 deletions src/zarr/storage/_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

ZipStoreAccessModeLiteral = Literal["r", "w", "a"]

# Sentinel value written to a zip entry to mark it as soft-deleted.
# The ZIP format does not support native deletion, so we overwrite the entry
# with this value and treat it as absent in all read/list operations.
_SOFT_DELETE_SENTINEL = b""


class ZipStore(Store):
"""
Expand Down Expand Up @@ -52,10 +57,18 @@ class ZipStore(Store):
path
compression
allowZip64

Notes
-----
Deletion is implemented as a soft-delete: the zip entry is overwritten with
an empty byte string (b""). All read, exists, and list operations
filter out soft-deleted entries so they appear absent to callers. Because
the ZIP format does not allow removing entries, soft-deleted entries remain
on disk but are invisible through the store API.
"""

supports_writes: bool = True
supports_deletes: bool = False
supports_deletes: bool = True # soft-delete via empty-byte overwrite
supports_listing: bool = True

path: Path
Expand Down Expand Up @@ -153,23 +166,26 @@ def _get(
self._sync_open()
# docstring inherited
try:
with self._zf.open(key) as f: # will raise KeyError
if byte_range is None:
return prototype.buffer.from_bytes(f.read())
elif isinstance(byte_range, RangeByteRequest):
f.seek(byte_range.start)
return prototype.buffer.from_bytes(f.read(byte_range.end - f.tell()))
size = f.seek(0, os.SEEK_END)
if isinstance(byte_range, OffsetByteRequest):
f.seek(byte_range.offset)
elif isinstance(byte_range, SuffixByteRequest):
f.seek(max(0, size - byte_range.suffix))
else:
raise TypeError(f"Unexpected byte_range, got {byte_range}.")
return prototype.buffer.from_bytes(f.read())
with self._zf.open(key) as f:
data = f.read()
except KeyError:
return None

# Treat soft-deleted entries (empty bytes) as missing
if data == _SOFT_DELETE_SENTINEL:
return None

if byte_range is None:
return prototype.buffer.from_bytes(data)
elif isinstance(byte_range, RangeByteRequest):
return prototype.buffer.from_bytes(data[byte_range.start : byte_range.end])
elif isinstance(byte_range, OffsetByteRequest):
return prototype.buffer.from_bytes(data[byte_range.offset :])
elif isinstance(byte_range, SuffixByteRequest):
return prototype.buffer.from_bytes(data[max(0, len(data) - byte_range.suffix) :])
else:
raise TypeError(f"Unexpected byte_range, got {byte_range}.")

async def get(
self,
key: str,
Expand Down Expand Up @@ -227,21 +243,31 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None:
if key not in members:
self._set(key, value)

async def delete_dir(self, prefix: str) -> None:
# only raise NotImplementedError if any keys are found
async def delete(self, key: str) -> None:
# docstring inherited
# Soft-delete: overwrite the entry with an empty byte sentinel.
# The ZIP format has no native delete API, so we mark the entry as
# deleted by writing b"" and filtering it out in all read/list paths.
# If the key does not exist in the archive, this is a no-op.
self._check_writable()
if prefix != "" and not prefix.endswith("/"):
prefix += "/"
async for _ in self.list_prefix(prefix):
raise NotImplementedError
with self._lock:
if key in self._zf.namelist():
keyinfo = zipfile.ZipInfo(
filename=key, date_time=time.localtime(time.time())[:6]
)
keyinfo.compress_type = self.compression
keyinfo.external_attr = 0o644 << 16 # ?rw-r--r--
self._zf.writestr(keyinfo, _SOFT_DELETE_SENTINEL)

async def delete(self, key: str) -> None:
async def delete_dir(self, prefix: str) -> None:
# docstring inherited
# we choose to only raise NotImplementedError here if the key exists
# this allows the array/group APIs to avoid the overhead of existence checks
# Collect all live keys under the prefix first, then soft-delete each.
self._check_writable()
if await self.exists(key):
raise NotImplementedError
if prefix != "" and not prefix.endswith("/"):
prefix += "/"
keys_to_delete = [key async for key in self.list_prefix(prefix)]
for key in keys_to_delete:
await self.delete(key)

async def exists(self, key: str) -> bool:
# docstring inherited
Expand All @@ -252,16 +278,26 @@ async def exists(self, key: str) -> bool:
self._zf.getinfo(key)
except KeyError:
return False
else:
return True
# Key physically exists — check it hasn't been soft-deleted
with self._zf.open(key) as f:
return f.read() != _SOFT_DELETE_SENTINEL

async def list(self) -> AsyncIterator[str]:
# docstring inherited
if not self._is_open:
self._sync_open()
with self._lock:
seen: set[str] = set()
for key in self._zf.namelist():
yield key
if key in seen:
continue
seen.add(key)
try:
with self._zf.open(key) as f:
if f.read() != _SOFT_DELETE_SENTINEL:
yield key
except KeyError:
pass

async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
# docstring inherited
Expand All @@ -274,8 +310,7 @@ async def list_dir(self, prefix: str) -> AsyncIterator[str]:
if not self._is_open:
self._sync_open()
prefix = prefix.rstrip("/")

keys = self._zf.namelist()
keys = [k async for k in self.list()]
seen = set()
if prefix == "":
keys_unique = {k.split("/")[0] for k in keys}
Expand All @@ -292,9 +327,7 @@ async def list_dir(self, prefix: str) -> AsyncIterator[str]:
yield k

async def move(self, path: Path | str) -> None:
"""
Move the store to another path.
"""
"""Move the store to another path."""
if isinstance(path, str):
path = Path(path)
self.close()
Expand Down
10 changes: 2 additions & 8 deletions tests/test_store/test_stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,16 @@ def test_zarr_hierarchy(sync_store: Store) -> None:
def mk_test_instance_sync() -> ZarrHierarchyStateMachine:
return ZarrHierarchyStateMachine(sync_store)

if isinstance(sync_store, ZipStore):
pytest.skip(reason="ZipStore does not support delete")

# ZipStore now supports soft-delete — skip removed
run_state_machine_as_test(mk_test_instance_sync) # type: ignore[no-untyped-call]


def test_zarr_store(sync_store: Store) -> None:
def mk_test_instance_sync() -> ZarrStoreStateMachine:
return ZarrStoreStateMachine(sync_store)

if isinstance(sync_store, ZipStore):
pytest.skip(reason="ZipStore does not support delete")
# ZipStore now supports soft-delete — skip removed

if isinstance(sync_store, LocalStore):
# This test uses arbitrary keys, which are passed to `set` and `delete`.
# It assumes that `set` and `delete` are the only two operations that modify state.
# But LocalStore, directories can hang around even after a key is delete-d.
pytest.skip(reason="Test isn't suitable for LocalStore.")
run_state_machine_as_test(mk_test_instance_sync) # type: ignore[no-untyped-call]
103 changes: 92 additions & 11 deletions tests/test_store/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,97 @@ def test_store_repr(self, store: ZipStore) -> None:
def test_store_supports_writes(self, store: ZipStore) -> None:
assert store.supports_writes

def test_store_supports_deletes(self, store: ZipStore) -> None:
assert store.supports_deletes

def test_store_supports_listing(self, store: ZipStore) -> None:
assert store.supports_listing

# TODO: fix this warning
# ------------------------------------------------------------------
# delete() tests
# ------------------------------------------------------------------

async def test_delete_makes_key_inaccessible(self, store: ZipStore) -> None:
key = "chunk/0/0"
value = cpu.Buffer.from_bytes(b"hello zarr")

await store.set(key, value)
assert await store.exists(key)

await store.delete(key)

assert not await store.exists(key)
result = await store.get(key, prototype=default_buffer_prototype())
assert result is None
listed = [k async for k in store.list()]
assert key not in listed

async def test_delete_nonexistent_key_is_noop(self, store: ZipStore) -> None:
await store.delete("does/not/exist") # must not raise

async def test_delete_does_not_affect_other_keys(self, store: ZipStore) -> None:
await store.set("a", cpu.Buffer.from_bytes(b"aaa"))
await store.set("b", cpu.Buffer.from_bytes(b"bbb"))

await store.delete("a")

assert not await store.exists("a")
assert await store.exists("b")
buf = await store.get("b", prototype=default_buffer_prototype())
assert buf is not None
assert buf.to_bytes() == b"bbb"

async def test_delete_already_deleted_key_is_noop(self, store: ZipStore) -> None:
key = "redundant/key"
await store.set(key, cpu.Buffer.from_bytes(b"data"))
await store.delete(key)
await store.delete(key) # second delete — no-op
assert not await store.exists(key)

# ------------------------------------------------------------------
# delete_dir() tests
# ------------------------------------------------------------------

async def test_delete_dir_removes_all_keys_under_prefix(self, store: ZipStore) -> None:
keys_under = ["prefix/a", "prefix/b", "prefix/sub/c"]
key_other = "other/d"

for key in keys_under + [key_other]:
await store.set(key, cpu.Buffer.from_bytes(b"data"))

await store.delete_dir("prefix")

listed = [k async for k in store.list()]
for key in keys_under:
assert key not in listed
assert not await store.exists(key)
assert key_other in listed

async def test_delete_dir_empty_prefix_is_noop(self, store: ZipStore) -> None:
await store.set("unrelated/key", cpu.Buffer.from_bytes(b"data"))
await store.delete_dir("nonexistent_prefix")
assert await store.exists("unrelated/key")

# ------------------------------------------------------------------
# list() soft-delete filtering
# ------------------------------------------------------------------

async def test_list_excludes_soft_deleted_keys(self, store: ZipStore) -> None:
for key in ["a", "b", "c"]:
await store.set(key, cpu.Buffer.from_bytes(b"x"))

await store.delete("b")

listed = [k async for k in store.list()]
assert "a" in listed
assert "b" not in listed
assert "c" in listed

# ------------------------------------------------------------------
# Updated integration test
# ------------------------------------------------------------------

@pytest.mark.filterwarnings("ignore::zarr.core.dtype.common.UnstableSpecificationWarning")
@pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning")
def test_api_integration(self, store: ZipStore) -> None:
root = zarr.open_group(store=store, mode="a")
Expand All @@ -92,17 +179,14 @@ def test_api_integration(self, store: ZipStore) -> None:
with pytest.warns(UserWarning, match="Duplicate name: 'foo/c/0/0'"):
z[0, 0] = 100

# TODO: assigning an entire chunk to fill value ends up deleting the chunk which is not supported
# a work around will be needed here.
with pytest.raises(NotImplementedError):
z[0:10, 0:10] = 99
# assigning fill value to a chunk now works via soft-delete
z[0:10, 0:10] = 99

bar = root.create_group("bar", attributes={"hello": "world"})
assert "hello" in dict(bar.attrs)

# keys cannot be deleted
with pytest.raises(NotImplementedError):
del root["bar"]
# keys can now be deleted via soft-delete
del root["bar"]

store.close()

Expand All @@ -126,7 +210,6 @@ async def test_zip_open_mode_translation(
assert store.read_only == read_only

def test_externally_zipped_store(self, tmp_path: Path) -> None:
# See: https://github.com/zarr-developers/zarr-python/issues/2757
zarr_path = tmp_path / "foo.zarr"
root = zarr.open_group(store=zarr_path, mode="w")
root.require_group("foo")
Expand All @@ -140,8 +223,6 @@ def test_externally_zipped_store(self, tmp_path: Path) -> None:
assert list(group.keys()) == list(group.keys())

async def test_list_without_explicit_open(self, tmp_path: Path) -> None:
# ZipStore.list(), list_dir(), and exists() should auto-open
# the zip file just like _get() and _set() do.
zip_path = tmp_path / "data.zip"
zarr_path = tmp_path / "foo.zarr"
root = zarr.open_group(store=zarr_path, mode="w")
Expand Down
Loading