Skip to content

Commit 2f8b320

Browse files
committed
remove orphan snapshots
1 parent a9ad3a3 commit 2f8b320

7 files changed

Lines changed: 912 additions & 1 deletion

File tree

pyiceberg/io/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import logging
3030
import warnings
3131
from abc import ABC, abstractmethod
32+
from collections.abc import Iterator
33+
from dataclasses import dataclass
34+
from datetime import datetime
3235
from io import SEEK_SET
3336
from types import TracebackType
3437
from typing import (
@@ -254,6 +257,15 @@ def create(self, overwrite: bool = False) -> OutputStream:
254257
"""
255258

256259

260+
@dataclass(frozen=True)
261+
class FileEntry:
262+
"""Metadata only for a single file."""
263+
264+
location: str
265+
size: int
266+
last_modified: datetime | None = None
267+
268+
257269
class FileIO(ABC):
258270
"""A base class for FileIO implementations."""
259271

@@ -291,6 +303,20 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
291303
FileNotFoundError: When the file at the provided location does not exist.
292304
"""
293305

306+
def list_prefix(self, location: str) -> Iterator[FileEntry]:
307+
"""Recursively list every file under the given location.
308+
309+
Returns metadata-only FileEntry objects.
310+
311+
312+
Args:
313+
location (str): A URI or path to recursively list.
314+
315+
Raises:
316+
NotImplementedError: If the FileIO implementation does not support listing.
317+
"""
318+
raise NotImplementedError(f"{type(self).__name__} does not support list_prefix")
319+
294320

295321
LOCATION = "location"
296322
WAREHOUSE = "warehouse"

pyiceberg/io/fsspec.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222
import logging
2323
import os
2424
import threading
25-
from collections.abc import Callable
25+
from collections.abc import Callable, Iterator
2626
from copy import copy
27+
from datetime import datetime, timezone
2728
from functools import lru_cache
2829
from typing import (
2930
TYPE_CHECKING,
@@ -83,6 +84,7 @@
8384
S3_SIGNER_ENDPOINT,
8485
S3_SIGNER_ENDPOINT_DEFAULT,
8586
S3_SIGNER_URI,
87+
FileEntry,
8688
FileIO,
8789
InputFile,
8890
InputStream,
@@ -467,6 +469,30 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
467469
fs = self._get_fs_from_uri(uri)
468470
fs.rm(str_location)
469471

472+
def list_prefix(self, location: str) -> Iterator[FileEntry]:
473+
"""Recursively list every file under the given location."""
474+
uri = urlparse(location)
475+
fs = self._get_fs_from_uri(uri)
476+
477+
for path, info in fs.find(location, detail=True).items():
478+
if info.get("type") not in (None, "file"):
479+
continue
480+
481+
mtime = info.get("mtime") or info.get("LastModified") or info.get("last_modified")
482+
last_modified: datetime | None
483+
if isinstance(mtime, datetime):
484+
last_modified = mtime
485+
elif isinstance(mtime, (int, float)):
486+
last_modified = datetime.fromtimestamp(mtime, tz=timezone.utc)
487+
else:
488+
last_modified = None
489+
490+
yield FileEntry(
491+
location=path if uri.scheme in ("", "file") else f"{uri.scheme}://{path}",
492+
size=int(info.get("size") or 0),
493+
last_modified=last_modified,
494+
)
495+
470496
def _get_fs_from_uri(self, uri: "ParseResult") -> AbstractFileSystem:
471497
"""Get a filesystem from a parsed URI, using hostname for ADLS account resolution."""
472498
if uri.scheme in _ADLS_SCHEMES:

pyiceberg/io/pyarrow.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from pyarrow._s3fs import S3RetryStrategy
6161
from pyarrow.fs import (
6262
FileInfo,
63+
FileSelector,
6364
FileSystem,
6465
FileType,
6566
)
@@ -114,6 +115,7 @@
114115
S3_ROLE_SESSION_NAME,
115116
S3_SECRET_ACCESS_KEY,
116117
S3_SESSION_TOKEN,
118+
FileEntry,
117119
FileIO,
118120
InputFile,
119121
InputStream,
@@ -674,6 +676,36 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
674676
raise PermissionError(f"Cannot delete file, access denied: {location}") from e
675677
raise # pragma: no cover - If some other kind of OSError, raise the raw error
676678

679+
def list_prefix(self, location: str) -> Iterator[FileEntry]:
680+
"""Recursively list every file under the given location."""
681+
original = urlparse(location)
682+
scheme, netloc, path = self.parse_location(location, self.properties)
683+
fs = self.fs_by_scheme(scheme, netloc)
684+
selector = FileSelector(path, recursive=True, allow_not_found=True)
685+
686+
if original.scheme in ("hdfs", "viewfs"):
687+
uri_prefix = f"{original.scheme}://{netloc}"
688+
ensure_leading_slash = True
689+
elif original.scheme:
690+
# Cloud filesystem paths from pyarrow already start with the bucket/container.
691+
uri_prefix = f"{original.scheme}://"
692+
ensure_leading_slash = False
693+
else:
694+
uri_prefix = ""
695+
ensure_leading_slash = False
696+
697+
for info in fs.get_file_info(selector):
698+
if info.type != FileType.File:
699+
continue
700+
info_path = info.path
701+
if ensure_leading_slash and not info_path.startswith("/"):
702+
info_path = "/" + info_path
703+
yield FileEntry(
704+
location=f"{uri_prefix}{info_path}",
705+
size=info.size or 0,
706+
last_modified=info.mtime,
707+
)
708+
677709
def __getstate__(self) -> dict[str, Any]:
678710
"""Create a dictionary of the PyArrowFileIO fields used when pickling."""
679711
fileio_copy = copy(self.__dict__)

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ class TableProperties:
205205
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
206206
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
207207

208+
GC_ENABLED = "gc.enabled"
209+
GC_ENABLED_DEFAULT = True
210+
208211

209212
class Transaction:
210213
_table: Table
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
if TYPE_CHECKING:
2626
from pyiceberg.table import Table
27+
from pyiceberg.table.maintenance.orphan_files import RemoveOrphanFiles
2728
from pyiceberg.table.update.snapshot import ExpireSnapshots
2829

2930

@@ -43,3 +44,13 @@ def expire_snapshots(self) -> ExpireSnapshots:
4344
from pyiceberg.table.update.snapshot import ExpireSnapshots
4445

4546
return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True))
47+
48+
def remove_orphan_files(self) -> RemoveOrphanFiles:
49+
"""Return a RemoveOrphanFiles builder for removing files unreachable from the table.
50+
51+
Returns:
52+
RemoveOrphanFiles builder for configuring and executing orphan file removal.
53+
"""
54+
from pyiceberg.table.maintenance.orphan_files import RemoveOrphanFiles
55+
56+
return RemoveOrphanFiles(self.tbl)

0 commit comments

Comments
 (0)