[DataLoader] Handle Cast(Literal, TIMESTAMP/DATE/TIME) in scan optimizer (#569 follow-up)#583
[DataLoader] Handle Cast(Literal, TIMESTAMP/DATE/TIME) in scan optimizer (#569 follow-up)#583hungj wants to merge 4 commits into
Conversation
…zer (linkedin#569 follow-up) PR linkedin#569 added datetime/date/time support to `filters._literal_to_sql()` so the Python -> DataFusion SQL forward direction emits e.g. `CAST('2026-05-02 00:00:00.000000+0000' AS TIMESTAMP)`. However the reverse direction in `scan_optimizer._convert_comparison()` only handles bare `exp.Literal` nodes. When sqlglot parses the SQL back, the value is an `exp.Cast(exp.Literal, DataType(TIMESTAMP))` — so `isinstance(right, exp.Literal)` returns False and the comparison gets silently dropped from the pushdown `row_filter`. The result, for Iceberg tables partitioned by `day(timestamp_col)`, is that the datetime filter never reaches PyIceberg's manifest pruning. Every file in the snapshot gets scanned regardless of the predicate. This patch extends `_convert_comparison` + `_literal_to_python` to also accept `Cast(Literal, TIMESTAMP/DATE/TIME)` and convert it back to the matching Python `datetime`/`date`/`time` value, restoring partition pushdown. Testing Done: - Added `test_cast_timestamp_pushed_as_datetime` covering TIMESTAMP, DATE, and the no-timezone TIMESTAMP form. All round-trip back to the expected datetime/date Filter. - Manually verified end-to-end: with the patch, an Iceberg scan with a daily-partitioned table only opens manifests whose partition value matches the datetime filter (instead of every manifest in the snapshot). BUG=DI-1868
robreeves
left a comment
There was a problem hiding this comment.
Not pushing down cast filters to data reads is not a bug. Predicate pushdown can only support simple literals. More complex filters like cast must be executed by the query engine.
Instead, the changes originally made in #569 should be enhanced to minimize creating complex filters so more filters can be pushed down to the read layer.
Here is what I found.
The fix is in _literal_to_sql, not _literal_to_python. Drop the CAST and emit plain string literals using .isoformat():
# Before (PR #569):
if isinstance(value, datetime):
lit = exp.Literal.string(value.strftime("%Y-%m-%d %H:%M:%S.%f%z"))
return exp.Cast(this=lit, to=exp.DataType.build("TIMESTAMP")).sql()
if isinstance(value, date):
lit = exp.Literal.string(value.isoformat())
return exp.Cast(this=lit, to=exp.DataType.build("DATE")).sql()
# After:
if isinstance(value, datetime):
return exp.Literal.string(value.isoformat()).sql()
if isinstance(value, date):
return exp.Literal.string(value.isoformat()).sql()
if isinstance(value, time):
return exp.Literal.string(value.isoformat()).sql()
Why this works end-to-end:
1. DataFusion implicitly coerces string literals to date/timestamp/time when compared against typed Arrow columns
2. Scan optimizer sees plain exp.Literal nodes — no CAST handling needed, _literal_to_python returns strings unchanged
3. PyIceberg promotes StringLiteral to DateLiteral/TimestampLiteral/TimeLiteral during expression binding against the table schema
| inner = node.this | ||
| if not inner.is_string: | ||
| # Cast over non-string literals — defer to the bare-literal branch. | ||
| return _literal_to_python(inner) |
There was a problem hiding this comment.
This ignores the case. This isn't valid because it doesn't handle valid cases like CAST('1' AS INT). For exmaple if there is an integer column i and the SQL has this filter i < CAST('1' AS INT).
| assert plan.row_filter == expected_filter, f"row_filter mismatch for: {where_clause}" | ||
|
|
||
|
|
||
| def test_cast_timestamp_pushed_as_datetime(): |
There was a problem hiding this comment.
In addition to this, can you add a test in data loader that will test this e2e. It should create a table with multiple files, pass a filter that should filter out some files during planning, then verify the split count. This will be more comprehensive and make sure it is working e2e regardless of the implementation details.
Summary
PR #569 added datetime/date/time support to
filters._literal_to_sql()so the Python → DataFusion SQL forward direction emits e.g.CAST('2026-05-02 00:00:00.000000+0000' AS TIMESTAMP).However the reverse direction in
scan_optimizer._convert_comparison()only handles bareexp.Literalnodes. When sqlglot parses the SQL back, the value is anexp.Cast(exp.Literal, DataType(TIMESTAMP))— soisinstance(right, exp.Literal)returnsFalseand the comparison gets silently dropped from the pushdownrow_filter.The result, for Iceberg tables partitioned by
day(timestamp_col), is that the datetime filter never reaches PyIceberg's manifest pruning. Every file in the snapshot gets scanned regardless of the predicate. Observed in production via adalids://table: requestingdatepartition >= '2026-05-02' AND datepartition < '2026-05-04'ended up reading whichever partitions happened to be at the head of the snapshot (datepartition_day=2026-05-08in our case).Changes
This patch extends
_convert_comparison+_literal_to_pythoninscan_optimizer.pyto also acceptCast(Literal, TIMESTAMP/DATE/TIME)and convert it back to the matching Pythondatetime/date/timevalue, restoring partition pushdown.Testing Done
Added
test_cast_timestamp_pushed_as_datetimecoveringTIMESTAMP(with tz),TIMESTAMP(without tz), andDATE. All three round-trip to the expected datetime/dateFilter.Manually verified end-to-end: with the patch, an Iceberg scan with a daily-partitioned table only opens manifests whose partition value matches the datetime filter (instead of every manifest in the snapshot).
Notes for Reviewers
Direct follow-up to #569 — they go together. Without this fix, that PR's forward-direction support produces a SQL string that the scan optimizer can't reverse, so partition pushdown silently regresses for datetime predicates. The change is small and isolated to
scan_optimizer.py.Jira info
BUG=DI-1868