Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,3 +472,24 @@ def ancestors_between(from_snapshot: Snapshot | None, to_snapshot: Snapshot, tab
break
else:
yield from ancestors_of(to_snapshot, table_metadata)


def latest_ancestor_before_timestamp(table_metadata: TableMetadata, timestamp_ms: int) -> Snapshot | None:
"""Find the latest ancestor snapshot whose timestamp is before the provided timestamp.
Args:
table_metadata: The table metadata for a table
timestamp_ms: lookup snapshots before this timestamp
Returns:
The latest ancestor snapshot older than the timestamp, or None if not found.
"""
result: Snapshot | None = None
result_timestamp: int = 0

for ancestor in ancestors_of(table_metadata.current_snapshot(), table_metadata):
if timestamp_ms > ancestor.timestamp_ms > result_timestamp:
result = ancestor
result_timestamp = ancestor.timestamp_ms

return result
21 changes: 21 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
SnapshotSummaryCollector,
Summary,
ancestors_of,
latest_ancestor_before_timestamp,
update_snapshot_summaries,
)
from pyiceberg.table.update import (
Expand Down Expand Up @@ -1008,6 +1009,26 @@ def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots:

return self.set_current_snapshot(snapshot_id=snapshot_id)

def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots:
"""Rollback the table to the latest snapshot before the given timestamp.
Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it.
Args:
timestamp_ms: Rollback to the latest snapshot before this timestamp in milliseconds.
Returns:
This for method chaining
Raises:
ValueError: If no valid snapshot exists older than the given timestamp.
"""
snapshot = latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms)
if snapshot is None:
raise ValueError(f"Cannot roll back, no valid snapshot older than: {timestamp_ms}")

return self.set_current_snapshot(snapshot_id=snapshot.snapshot_id)

def _is_current_ancestor(self, snapshot_id: int) -> bool:
return snapshot_id in self._current_ancestors()

Expand Down
63 changes: 63 additions & 0 deletions tests/integration/test_snapshot_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,66 @@ def test_rollback_to_snapshot_unknown_id(table_with_snapshots: Table) -> None:

with pytest.raises(ValueError, match="Cannot roll back to unknown snapshot id"):
table_with_snapshots.manage_snapshots().rollback_to_snapshot(snapshot_id=invalid_snapshot_id).commit()


@pytest.mark.integration
def test_rollback_to_timestamp_no_valid_snapshot(table_with_snapshots: Table) -> None:
history = table_with_snapshots.history()
assert len(history) >= 1

oldest_timestamp = history[0].timestamp_ms

with pytest.raises(ValueError, match="Cannot roll back, no valid snapshot older than"):
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=oldest_timestamp).commit()


@pytest.mark.integration
def test_rollback_to_timestamp(table_with_snapshots: Table) -> None:
current_snapshot = table_with_snapshots.current_snapshot()
assert current_snapshot is not None
assert current_snapshot.parent_snapshot_id is not None

parent_snapshot_id = current_snapshot.parent_snapshot_id

table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms).commit()

updated_snapshot = table_with_snapshots.current_snapshot()
assert updated_snapshot is not None
assert updated_snapshot.snapshot_id == parent_snapshot_id


@pytest.mark.integration
def test_rollback_to_timestamp_current_snapshot(table_with_snapshots: Table) -> None:
current_snapshot = table_with_snapshots.current_snapshot()
assert current_snapshot is not None

timestamp_after_current = current_snapshot.timestamp_ms + 100
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=timestamp_after_current).commit()

updated_snapshot = table_with_snapshots.current_snapshot()
assert updated_snapshot is not None
assert updated_snapshot.snapshot_id == current_snapshot.snapshot_id


@pytest.mark.integration
def test_rollback_to_timestamp_chained_with_tag(table_with_snapshots: Table) -> None:
current_snapshot = table_with_snapshots.current_snapshot()
assert current_snapshot is not None
assert current_snapshot.parent_snapshot_id is not None

parent_snapshot_id = current_snapshot.parent_snapshot_id
tag_name = "my-tag"

(
table_with_snapshots.manage_snapshots()
.create_tag(snapshot_id=current_snapshot.snapshot_id, tag_name=tag_name)
.rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms)
.commit()
)

updated_snapshot = table_with_snapshots.current_snapshot()
assert updated_snapshot is not None
assert updated_snapshot.snapshot_id == parent_snapshot_id
assert table_with_snapshots.metadata.refs[tag_name] == SnapshotRef(
snapshot_id=current_snapshot.snapshot_id, snapshot_ref_type="tag"
)
77 changes: 77 additions & 0 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Summary,
ancestors_between,
ancestors_of,
latest_ancestor_before_timestamp,
update_snapshot_summaries,
)
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -456,3 +457,79 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
)
== 2000
)


def test_latest_ancestor_before_timestamp() -> None:
from pyiceberg.table.metadata import TableMetadataV2

# Create metadata with 4 snapshots at ordered timestamps
metadata = TableMetadataV2(
**{
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 4,
"last-updated-ms": 1602638573590,
"last-column-id": 1,
"current-schema-id": 0,
"schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"last-partition-id": 999,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"current-snapshot-id": 4,
"snapshots": [
{
"snapshot-id": 1,
"timestamp-ms": 1000,
"sequence-number": 1,
"summary": {"operation": "append"},
"manifest-list": "s3://a/1.avro",
},
{
"snapshot-id": 2,
"parent-snapshot-id": 1,
"timestamp-ms": 2000,
"sequence-number": 2,
"summary": {"operation": "append"},
"manifest-list": "s3://a/2.avro",
},
{
"snapshot-id": 3,
"parent-snapshot-id": 2,
"timestamp-ms": 3000,
"sequence-number": 3,
"summary": {"operation": "append"},
"manifest-list": "s3://a/3.avro",
},
{
"snapshot-id": 4,
"parent-snapshot-id": 3,
"timestamp-ms": 4000,
"sequence-number": 4,
"summary": {"operation": "append"},
"manifest-list": "s3://a/4.avro",
},
],
}
)

result = latest_ancestor_before_timestamp(metadata, 3500)
assert result is not None
assert result.snapshot_id == 3

result = latest_ancestor_before_timestamp(metadata, 2500)
assert result is not None
assert result.snapshot_id == 2

result = latest_ancestor_before_timestamp(metadata, 5000)
assert result is not None
assert result.snapshot_id == 4

result = latest_ancestor_before_timestamp(metadata, 3000)
assert result is not None
assert result.snapshot_id == 2

result = latest_ancestor_before_timestamp(metadata, 1000)
assert result is None