diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 4ef1645df6..7ebb20a6ad 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -472,3 +472,24 @@ def ancestors_between(from_snapshot: Snapshot | None, to_snapshot: Snapshot, tab break else: yield from ancestors_of(to_snapshot, table_metadata) + + +def latest_ancestor_before_timestamp(table_metadata: TableMetadata, timestamp_ms: int) -> Snapshot | None: + """Find the latest ancestor snapshot whose timestamp is before the provided timestamp. + + Args: + table_metadata: The table metadata for a table + timestamp_ms: lookup snapshots before this timestamp + + Returns: + The latest ancestor snapshot older than the timestamp, or None if not found. + """ + result: Snapshot | None = None + result_timestamp: int = 0 + + for ancestor in ancestors_of(table_metadata.current_snapshot(), table_metadata): + if timestamp_ms > ancestor.timestamp_ms > result_timestamp: + result = ancestor + result_timestamp = ancestor.timestamp_ms + + return result diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 987200bf67..b7c863d84a 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -65,6 +65,7 @@ SnapshotSummaryCollector, Summary, ancestors_of, + latest_ancestor_before_timestamp, update_snapshot_summaries, ) from pyiceberg.table.update import ( @@ -1008,6 +1009,26 @@ def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots: return self.set_current_snapshot(snapshot_id=snapshot_id) + def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots: + """Rollback the table to the latest snapshot before the given timestamp. + + Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it. + + Args: + timestamp_ms: Rollback to the latest snapshot before this timestamp in milliseconds. + + Returns: + This for method chaining + + Raises: + ValueError: If no valid snapshot exists older than the given timestamp. + """ + snapshot = latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms) + if snapshot is None: + raise ValueError(f"Cannot roll back, no valid snapshot older than: {timestamp_ms}") + + return self.set_current_snapshot(snapshot_id=snapshot.snapshot_id) + def _is_current_ancestor(self, snapshot_id: int) -> bool: return snapshot_id in self._current_ancestors() diff --git a/tests/integration/test_snapshot_operations.py b/tests/integration/test_snapshot_operations.py index 8755e95fbb..6fd3aadaa3 100644 --- a/tests/integration/test_snapshot_operations.py +++ b/tests/integration/test_snapshot_operations.py @@ -268,3 +268,66 @@ def test_rollback_to_snapshot_unknown_id(table_with_snapshots: Table) -> None: with pytest.raises(ValueError, match="Cannot roll back to unknown snapshot id"): table_with_snapshots.manage_snapshots().rollback_to_snapshot(snapshot_id=invalid_snapshot_id).commit() + + +@pytest.mark.integration +def test_rollback_to_timestamp_no_valid_snapshot(table_with_snapshots: Table) -> None: + history = table_with_snapshots.history() + assert len(history) >= 1 + + oldest_timestamp = history[0].timestamp_ms + + with pytest.raises(ValueError, match="Cannot roll back, no valid snapshot older than"): + table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=oldest_timestamp).commit() + + +@pytest.mark.integration +def test_rollback_to_timestamp(table_with_snapshots: Table) -> None: + current_snapshot = table_with_snapshots.current_snapshot() + assert current_snapshot is not None + assert current_snapshot.parent_snapshot_id is not None + + parent_snapshot_id = current_snapshot.parent_snapshot_id + + table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms).commit() + + updated_snapshot = table_with_snapshots.current_snapshot() + assert updated_snapshot is not None + assert updated_snapshot.snapshot_id == parent_snapshot_id + + +@pytest.mark.integration +def test_rollback_to_timestamp_current_snapshot(table_with_snapshots: Table) -> None: + current_snapshot = table_with_snapshots.current_snapshot() + assert current_snapshot is not None + + timestamp_after_current = current_snapshot.timestamp_ms + 100 + table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=timestamp_after_current).commit() + + updated_snapshot = table_with_snapshots.current_snapshot() + assert updated_snapshot is not None + assert updated_snapshot.snapshot_id == current_snapshot.snapshot_id + + +@pytest.mark.integration +def test_rollback_to_timestamp_chained_with_tag(table_with_snapshots: Table) -> None: + current_snapshot = table_with_snapshots.current_snapshot() + assert current_snapshot is not None + assert current_snapshot.parent_snapshot_id is not None + + parent_snapshot_id = current_snapshot.parent_snapshot_id + tag_name = "my-tag" + + ( + table_with_snapshots.manage_snapshots() + .create_tag(snapshot_id=current_snapshot.snapshot_id, tag_name=tag_name) + .rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms) + .commit() + ) + + updated_snapshot = table_with_snapshots.current_snapshot() + assert updated_snapshot is not None + assert updated_snapshot.snapshot_id == parent_snapshot_id + assert table_with_snapshots.metadata.refs[tag_name] == SnapshotRef( + snapshot_id=current_snapshot.snapshot_id, snapshot_ref_type="tag" + ) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index d26562ad8f..4aa9521b78 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -30,6 +30,7 @@ Summary, ancestors_between, ancestors_of, + latest_ancestor_before_timestamp, update_snapshot_summaries, ) from pyiceberg.transforms import IdentityTransform @@ -456,3 +457,79 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: ) == 2000 ) + + +def test_latest_ancestor_before_timestamp() -> None: + from pyiceberg.table.metadata import TableMetadataV2 + + # Create metadata with 4 snapshots at ordered timestamps + metadata = TableMetadataV2( + **{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 4, + "last-updated-ms": 1602638573590, + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "current-snapshot-id": 4, + "snapshots": [ + { + "snapshot-id": 1, + "timestamp-ms": 1000, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/1.avro", + }, + { + "snapshot-id": 2, + "parent-snapshot-id": 1, + "timestamp-ms": 2000, + "sequence-number": 2, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/2.avro", + }, + { + "snapshot-id": 3, + "parent-snapshot-id": 2, + "timestamp-ms": 3000, + "sequence-number": 3, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/3.avro", + }, + { + "snapshot-id": 4, + "parent-snapshot-id": 3, + "timestamp-ms": 4000, + "sequence-number": 4, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/4.avro", + }, + ], + } + ) + + result = latest_ancestor_before_timestamp(metadata, 3500) + assert result is not None + assert result.snapshot_id == 3 + + result = latest_ancestor_before_timestamp(metadata, 2500) + assert result is not None + assert result.snapshot_id == 2 + + result = latest_ancestor_before_timestamp(metadata, 5000) + assert result is not None + assert result.snapshot_id == 4 + + result = latest_ancestor_before_timestamp(metadata, 3000) + assert result is not None + assert result.snapshot_id == 2 + + result = latest_ancestor_before_timestamp(metadata, 1000) + assert result is None