Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
repos:
# Ruff - replaces black, isort, flake8
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.7.4
rev: v0.15.2
hooks:
# Run the linter
- id: ruff
Expand All @@ -15,7 +15,7 @@ repos:

# mypy - type checking
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.13.0
rev: v1.19.1
hooks:
- id: mypy
additional_dependencies:
Expand All @@ -37,7 +37,7 @@ repos:

# Standard pre-commit hooks
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
rev: v6.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -52,7 +52,7 @@ repos:

# Security scanning with bandit
- repo: https://github.com/PyCQA/bandit
rev: 1.7.10
rev: 1.9.3
hooks:
- id: bandit
args: [-c, pyproject.toml, -r, src/]
Expand Down
8 changes: 3 additions & 5 deletions src/tablesleuth/api/routers/gizmosql.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,13 @@ def compare_performance(req: CompareRequest) -> dict[str, Any]:
profiler.register_delta_table_with_version(
"ver_b", req.path, int(req.id_b), storage_options=req.storage_options
)
label_a, label_b = "ver_a", "ver_b"
analyzer = SnapshotPerformanceAnalyzer(profiler)
comparison = analyzer.compare_query_performance("ver_a", "ver_b", req.query)
return _serialize_comparison(comparison)

else:
raise ValueError(f"Unsupported format: {req.format}")

analyzer = SnapshotPerformanceAnalyzer(profiler)
comparison = analyzer.compare_query_performance(label_a, label_b, req.query)
return _serialize_comparison(comparison)

except HTTPException:
raise
except ValueError as exc:
Expand Down
2 changes: 1 addition & 1 deletion src/tablesleuth/cli/config_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def config_check(verbose: bool, with_gizmosql: bool) -> None:
"TABLESLEUTH_CATALOG_NAME": "Default catalog override",
"TABLESLEUTH_GIZMO_URI": "GizmoSQL URI override",
"TABLESLEUTH_GIZMO_USERNAME": "GizmoSQL username override",
"TABLESLEUTH_GIZMO_PASSWORD": "GizmoSQL password override",
"TABLESLEUTH_GIZMO_PASSWORD": "GizmoSQL password override", # nosec B105
"PYICEBERG_HOME": "PyIceberg config directory",
}

Expand Down
207 changes: 110 additions & 97 deletions src/tablesleuth/services/iceberg_manifest_patch.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
"""Patch Iceberg snapshot metadata to work around DuckDB's uppercase file_format bug.
"""Patch Iceberg snapshot metadata to work around DuckDB iceberg_scan() issues.

DuckDB's iceberg_scan() rejects delete files whose file_format is stored as 'PARQUET'
(uppercase) in the manifest avro — it only accepts 'parquet' (lowercase). This module
creates a lightweight, temporary, locally-patched copy of the metadata chain:
Three problems are corrected:

metadata.json → patched temp copy (JSON rewrite, trivial)
manifest-list.avro → patched temp copy (avro rewrite via fastavro, redirects
delete-manifest paths to temp copies)
delete manifests → fastavro-rewritten temp copies (file_format lowercased,
handles null/Snappy/deflate codecs transparently)
1. **Uppercase file_format** — DuckDB's ``iceberg_scan()`` rejects delete files whose
``file_format`` is stored as ``'PARQUET'`` (uppercase); it only accepts ``'parquet'``.
Affected delete manifest avro files are re-encoded with the value lowercased.

2. **Stale current-snapshot-id** — DuckDB applies delete files based on the table's
``current-snapshot-id`` rather than the ``version =>`` argument. The patched
``metadata.json`` always sets ``current-snapshot-id`` to the target snapshot.

3. **Relative paths in local tables** — When an Iceberg table lives on the local
filesystem, manifest paths and data file paths stored in the metadata chain are
often relative. DuckDB resolves them from GizmoSQL's working directory, which
differs from the server's CWD on Windows. The patched metadata chain rewrites
all relative paths to absolute posix paths so DuckDB can find them regardless of
CWD.

This module creates a lightweight, temporary, locally-patched copy of the metadata chain:

metadata.json → patched temp copy (JSON rewrite)
manifest-list.avro → patched temp copy (avro rewrite via fastavro; redirects
any re-encoded manifests and resolves relative paths)
manifests (all) → fastavro-rewritten temp copies when needed:
- data manifests: relative file_path resolved to absolute
- delete manifests: same + file_format lowercased

Data manifests and all actual data / delete Parquet files are NOT copied or modified;
they are referenced by their original S3 / local paths.
Expand Down Expand Up @@ -61,11 +77,26 @@ def _read_bytes(uri: str) -> bytes:
return Path(uri).read_bytes()


def _patch_file_format_in_record(obj: Any) -> tuple[Any, bool]:
"""Recursively lowercase any file_format == 'PARQUET' field in a deserialized Avro object.
def _is_relative_local_path(path: str) -> bool:
"""Return True if *path* is a relative local filesystem path (not a URI, not absolute).

S3 URIs, file:// URIs, and absolute OS paths all return False.
"""
if not path:
return False
if path.startswith(("s3://", "s3a://", "gs://", "az://", "abfs://", "file://")):
return False
return not Path(path).is_absolute()


def _patch_manifest_record(obj: Any, fix_format: bool) -> tuple[Any, bool]:
"""Recursively patch a deserialized Avro manifest record.

Handles any nesting depth so it works regardless of whether the manifest uses
the v1 or v2 Iceberg schema layout (direct record vs union-wrapped struct).
Two transformations:
- ``file_path`` fields whose value is a relative local path are resolved to
absolute posix paths (so DuckDB can open them from any CWD).
- ``file_format`` fields whose value is ``'PARQUET'`` are lowercased to
``'parquet'`` when *fix_format* is True (delete manifests only).

Returns:
Tuple of (possibly-modified object, changed_flag).
Expand All @@ -74,11 +105,14 @@ def _patch_file_format_in_record(obj: Any) -> tuple[Any, bool]:
changed = False
new_obj: dict[str, Any] = {}
for k, v in obj.items():
if k == "file_format" and isinstance(v, str) and v == "PARQUET":
if k == "file_path" and isinstance(v, str) and _is_relative_local_path(v):
new_obj[k] = Path(v).resolve().as_posix()
changed = True
elif k == "file_format" and fix_format and isinstance(v, str) and v == "PARQUET":
new_obj[k] = "parquet"
changed = True
else:
new_v, sub_changed = _patch_file_format_in_record(v)
new_v, sub_changed = _patch_manifest_record(v, fix_format)
new_obj[k] = new_v
if sub_changed:
changed = True
Expand All @@ -87,7 +121,7 @@ def _patch_file_format_in_record(obj: Any) -> tuple[Any, bool]:
changed = False
new_list: list[Any] = []
for item in obj:
new_item, sub_changed = _patch_file_format_in_record(item)
new_item, sub_changed = _patch_manifest_record(item, fix_format)
new_list.append(new_item)
if sub_changed:
changed = True
Expand All @@ -96,14 +130,17 @@ def _patch_file_format_in_record(obj: Any) -> tuple[Any, bool]:
return obj, False


def _patch_delete_manifest(content: bytes) -> bytes:
"""Rewrite a delete manifest avro, lowercasing file_format 'PARQUET' → 'parquet'.
def _patch_manifest(content: bytes, is_delete: bool) -> bytes:
"""Patch a manifest avro file using fastavro round-tripping.

Uses fastavro for proper Avro round-tripping so it works with null, Snappy,
and deflate codecs. Falls back to binary substitution if fastavro fails.
For data manifests (``is_delete=False``): resolves relative ``file_path`` values.
For delete manifests (``is_delete=True``): also lowercases ``file_format``.

Falls back to binary ``b'PARQUET'`` → ``b'parquet'`` substitution for delete
manifests if fastavro fails (handles most null-codec avro files).

Returns the original bytes object unchanged if no patching was needed.
Caller uses identity comparison (patched is content) to detect this.
Caller uses identity comparison (``patched is content``) to detect this.
"""
import fastavro

Expand All @@ -114,7 +151,7 @@ def _patch_delete_manifest(content: bytes) -> bytes:
changed = False

for record in reader:
new_record, was_changed = _patch_file_format_in_record(record)
new_record, was_changed = _patch_manifest_record(record, fix_format=is_delete)
records.append(new_record)
if was_changed:
changed = True
Expand All @@ -124,27 +161,22 @@ def _patch_delete_manifest(content: bytes) -> bytes:

out = io.BytesIO()
fastavro.writer(out, fastavro.parse_schema(schema), records)
patched = out.getvalue()
logger.warning(
"Patched delete manifest: lowercased %d file_format field(s)",
sum(
1
for r in records
if isinstance(r.get("data_file"), dict)
and r["data_file"].get("file_format") == "parquet"
),
)
return patched
return out.getvalue()

except Exception as exc:
logger.warning("fastavro round-trip failed (%s), falling back to binary patch", exc)
if b"PARQUET" not in content:
return content
return content.replace(b"PARQUET", b"parquet")
if is_delete and b"PARQUET" in content:
return content.replace(b"PARQUET", b"parquet")
return content


def _rewrite_manifest_list(content: bytes, path_map: dict[str, str]) -> bytes:
"""Rewrite a manifest-list avro file, updating manifest_path values per path_map.
"""Rewrite a manifest-list avro file, updating manifest_path values.

Two transformations are applied:
- Paths in *path_map* are replaced with their mapped values (patched manifests).
- Relative local paths not in path_map are resolved to absolute posix paths so
DuckDB can open them when the manifest-list lives in a temp directory.

Uses fastavro for proper avro round-tripping (path strings change length so
binary substitution is not safe here).
Expand All @@ -159,6 +191,9 @@ def _rewrite_manifest_list(content: bytes, path_map: dict[str, str]) -> bytes:
if orig in path_map:
record = dict(record) # type: ignore[arg-type]
record["manifest_path"] = path_map[orig]
elif _is_relative_local_path(orig):
record = dict(record) # type: ignore[arg-type]
record["manifest_path"] = Path(orig).resolve().as_posix()
records.append(record) # type: ignore[arg-type]

out = io.BytesIO()
Expand All @@ -178,21 +213,7 @@ def patched_iceberg_metadata(
) -> Generator[str, None, None]: # noqa: UP043
"""Yield a ``file://`` URI for a locally-patched copy of Iceberg snapshot metadata.

Two problems are corrected:

1. **DuckDB delete-file format bug** — DuckDB's ``iceberg_scan()`` rejects delete
manifests whose ``file_format`` is ``'PARQUET'`` (uppercase). Affected avro
files are re-encoded with the value lowercased.

2. **Stale current-snapshot-id** — DuckDB applies delete files based on the
table's ``current-snapshot-id`` rather than the ``version =>`` argument.
When comparing an older snapshot against a newer one that introduced deletes,
the older snapshot would incorrectly inherit those deletes. The patched
metadata sets ``current-snapshot-id`` to the target snapshot so DuckDB only
sees the delete files that belong to that particular snapshot.

A temporary local ``metadata.json`` is always written (even when no avro
re-encoding is needed) to carry the corrected ``current-snapshot-id``.
See module docstring for the three problems corrected.

Args:
native_table: PyIceberg ``Table`` instance (from ``IcebergTableInfo.native_table``).
Expand All @@ -209,8 +230,7 @@ def patched_iceberg_metadata(
snapshot_id,
)

# Read the metadata JSON (always local-accessible since IcebergMetadataService
# already resolved it; for S3 tables PyIceberg caches/fetches it).
# Read the metadata JSON.
try:
meta_bytes = _read_bytes(metadata_location)
except Exception as exc:
Expand All @@ -235,7 +255,7 @@ def patched_iceberg_metadata(
manifest_list_uri: str = target_snap["manifest-list"]
logger.debug("patched_iceberg_metadata: manifest_list_uri=%r", manifest_list_uri)

# Read the manifest-list avro to discover which manifests are DELETE manifests.
# Read the manifest-list avro.
try:
ml_bytes = _read_bytes(manifest_list_uri)
except Exception as exc:
Expand All @@ -248,78 +268,71 @@ def patched_iceberg_metadata(
ml_reader = fastavro.reader(io.BytesIO(ml_bytes))
ml_records: list[dict] = list(ml_reader) # type: ignore[arg-type]

# Identify DELETE manifests (content == 1 in the manifest-list schema).
delete_manifest_uris = [r["manifest_path"] for r in ml_records if r.get("content", 0) == 1]

logger.debug(
"patched_iceberg_metadata: found %d delete manifest(s): %r",
len(delete_manifest_uris),
delete_manifest_uris,
)
logger.debug("patched_iceberg_metadata: manifest-list has %d entries", len(ml_records))

# Always create a patched metadata copy even when no delete manifests need format
# fixing. DuckDB's iceberg_scan() applies delete files based on the table's
# *current-snapshot-id* rather than on the version specified via ``version =>``.
# Setting current-snapshot-id to the target snapshot in a local copy ensures
# DuckDB only sees delete files that belong to that snapshot, preventing older
# snapshots from inheriting delete records added by a newer current snapshot.
with tempfile.TemporaryDirectory(prefix="tablesleuth_iceberg_patch_") as tmpdir:
tmp = Path(tmpdir)
path_map: dict[str, str] = {} # original URI → local posix path
# Maps original manifest URI → local posix path for any re-encoded manifest.
path_map: dict[str, str] = {}

# Process ALL manifests — both data (content=0) and delete (content=1).
# Data manifests may have relative file_path values; delete manifests may
# additionally have uppercase file_format values.
for idx, ml_record in enumerate(ml_records):
manifest_uri: str = ml_record.get("manifest_path", "")
is_delete = ml_record.get("content", 0) == 1

for idx, del_uri in enumerate(delete_manifest_uris):
try:
raw = _read_bytes(del_uri)
raw = _read_bytes(manifest_uri)
except Exception as exc:
logger.warning("Cannot read delete manifest %r: %s", del_uri, exc)
logger.warning("Cannot read manifest %r: %s", manifest_uri, exc)
continue

patched = _patch_delete_manifest(raw)
patched = _patch_manifest(raw, is_delete=is_delete)
if patched is raw:
# No uppercase PARQUET found — no format patch needed for this manifest.
logger.debug(
"patched_iceberg_metadata: delete manifest %r needed no patching", del_uri
"patched_iceberg_metadata: manifest %r needed no patching", manifest_uri
)
continue

local_name = f"delete_manifest_{idx}.avro"
local_name = f"manifest_{idx}.avro"
local_path = tmp / local_name
local_path.write_bytes(patched)
# Use posix path (no file:// prefix) so DuckDB can open it directly.
# file:///C:/... URIs get mangled by DuckDB's internal path stripping
# (file:// stripped → /C:/... which is invalid on Windows).
local_posix = local_path.as_posix()
path_map[del_uri] = local_posix
path_map[manifest_uri] = local_posix
logger.debug(
"patched_iceberg_metadata: patched delete manifest %r → %s", del_uri, local_posix
"patched_iceberg_metadata: patched manifest %r → %s", manifest_uri, local_posix
)

# Rewrite the manifest-list only when some delete manifests were re-encoded.
# Rewrite the manifest-list when: (a) some manifests were re-encoded, OR
# (b) it contains relative manifest_path values (Windows local tables).
needs_ml_rewrite = bool(path_map) or any(
_is_relative_local_path(r.get("manifest_path", "")) for r in ml_records
)

new_ml_posix: str | None = None
if path_map:
if needs_ml_rewrite:
patched_ml = _rewrite_manifest_list(ml_bytes, path_map)
ml_path = tmp / "manifest_list.avro"
ml_path.write_bytes(patched_ml)
new_ml_posix = ml_path.as_posix()
logger.debug("patched_iceberg_metadata: rewrote manifest-list → %s", new_ml_posix)
else:
logger.debug(
"patched_iceberg_metadata: %d delete manifest(s) found; none needed format patching",
len(delete_manifest_uris),
)

# Always rewrite the metadata JSON so that:
# 1. current-snapshot-id points to the target snapshot (not the table's
# current HEAD), preventing DuckDB from applying newer delete files.
# 2. If delete manifests were re-encoded, the manifest-list path is updated.
# Always rewrite metadata.json to:
# 1. Set current-snapshot-id to target snapshot (prevents DuckDB from
# applying delete files from newer snapshots to this one).
# 2. Point this snapshot's manifest-list to the patched copy (if any).
# 3. Resolve relative manifest-list paths for all other snapshots.
patched_meta = json.loads(json.dumps(metadata)) # deep copy
patched_meta["current-snapshot-id"] = snapshot_id

if new_ml_posix is not None:
for snap in patched_meta.get("snapshots", []):
if snap.get("snapshot-id") == snapshot_id:
snap["manifest-list"] = new_ml_posix
break
for snap in patched_meta.get("snapshots", []):
ml = snap.get("manifest-list", "")
if snap.get("snapshot-id") == snapshot_id and new_ml_posix is not None:
snap["manifest-list"] = new_ml_posix
elif _is_relative_local_path(ml):
snap["manifest-list"] = Path(ml).resolve().as_posix()

meta_path = tmp / "metadata.json"
meta_path.write_text(json.dumps(patched_meta), encoding="utf-8")
Expand Down
Loading
Loading