Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integrations/python/dataloader/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ dev = [
"mypy>=1.14.0",
"types-requests>=2.31.0",
"opentelemetry-sdk>=1.41.1",
# Test-only: enables PyIceberg's InMemoryCatalog (SqlCatalog over SQLite) and
# partition-transform support for end-to-end split-pruning tests.
"sqlalchemy>=2.0.0",
"pyiceberg-core>=0.5.1,<0.9.0",
]

[tool.hatch.version]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,25 +326,27 @@ def _escape_like(value: str) -> str:


def _literal_to_sql(value: object) -> str:
"""Convert a Python literal to a SQL literal string using sqlglot."""
"""Convert a Python literal to a SQL literal string using sqlglot.

Datetime/date/time values are emitted as plain string literals (ISO format).
DataFusion implicitly coerces string literals to the column type at execution,
and PyIceberg promotes StringLiteral to the matching typed literal during expression binding.
"""
if isinstance(value, str):
return exp.Literal.string(value).sql()
if isinstance(value, bool):
return exp.Boolean(this=True).sql() if value else exp.Boolean(this=False).sql()
if isinstance(value, datetime):
lit = exp.Literal.string(value.strftime("%Y-%m-%d %H:%M:%S.%f%z"))
return exp.Cast(this=lit, to=exp.DataType.build("TIMESTAMP")).sql()
return exp.Literal.string(value.isoformat()).sql()
if isinstance(value, date):
lit = exp.Literal.string(value.isoformat())
return exp.Cast(this=lit, to=exp.DataType.build("DATE")).sql()
return exp.Literal.string(value.isoformat()).sql()
if isinstance(value, time):
if value.tzinfo is not None:
raise TypeError(
"DataFusion does not support timezones for time data types. "
"The time should match the timezone used in the dataset."
)
lit = exp.Literal.string(value.strftime("%H:%M:%S.%f"))
return exp.Cast(this=lit, to=exp.DataType.build("TIME")).sql()
return exp.Literal.string(value.isoformat()).sql()
if isinstance(value, (int, float)):
if isinstance(value, float) and not math.isfinite(value):
return exp.Cast(this=exp.Literal.string(str(value)), to=exp.DataType.build("DOUBLE")).sql()
Expand Down
64 changes: 64 additions & 0 deletions integrations/python/dataloader/tests/test_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,3 +1077,67 @@ def test_projection_udf_with_mixed_case_materializes(tmp_path):
assert result.column("policyField").to_pylist() == ["policy_a", "policy_b", "policy_c"]
# foo returns True; NOT True = False; bar(False, val, ...) returns val (not replaced)
assert result.column("otherField").to_pylist() == ["x", "y", "z"]


# --- End-to-end partition pruning ---


def _build_partitioned_table(tmp_path, days: list):
"""Create a day(ts)-partitioned Iceberg table and write one file per day.

Uses PyIceberg's InMemoryCatalog (SQLite-backed SqlCatalog) so that
`scan().plan_files()` runs the real manifest pruning logic.
"""
import datetime as _dt

from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.transforms import DayTransform
from pyiceberg.types import TimestampType

schema = Schema(
NestedField(field_id=1, name=COL_ID, field_type=LongType(), required=False),
NestedField(field_id=2, name="ts", field_type=TimestampType(), required=False),
)
spec = PartitionSpec(
PartitionField(source_id=2, field_id=1000, transform=DayTransform(), name="ts_day"),
)
catalog = InMemoryCatalog("test", warehouse=f"file://{tmp_path}")
catalog.create_namespace("db")
table = catalog.create_table("db.tbl", schema=schema, partition_spec=spec)

arrow_schema = pa.schema([pa.field(COL_ID, pa.int64()), pa.field("ts", pa.timestamp("us"))])
for i, day in enumerate(days):
batch = pa.Table.from_pydict({COL_ID: [i], "ts": [_dt.datetime.combine(day, _dt.time())]}, schema=arrow_schema)
table.append(batch)
return catalog


def test_e2e_datetime_filter_prunes_files_to_expected_splits(tmp_path):
"""End-to-end: a datetime filter on a day-partitioned table reduces the
plan_files() output. Verifies the full path from filter DSL through SQL
emission, scan_optimizer, PyIceberg expression binding, and manifest pruning.
"""
import datetime as _dt

days = [_dt.date(2026, 5, 2), _dt.date(2026, 5, 3), _dt.date(2026, 5, 8)]
catalog = _build_partitioned_table(tmp_path, days)

# No filter: every partition is scanned.
all_splits = list(OpenHouseDataLoader(catalog=catalog, database="db", table="tbl"))
assert len(all_splits) == 3

# Range filter: only the two May 2-3 partitions should be planned.
range_filter = (col("ts") >= _dt.datetime(2026, 5, 2)) & (col("ts") < _dt.datetime(2026, 5, 4))
range_splits = list(OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", filters=range_filter))
assert len(range_splits) == 2
paths = [t.file.file_path for s in range_splits for t in s._file_scan_tasks]
assert all("ts_day=2026-05-02" in p or "ts_day=2026-05-03" in p for p in paths)
assert not any("ts_day=2026-05-08" in p for p in paths)

# Tight filter: only the May 8 partition.
tight_filter = col("ts") >= _dt.datetime(2026, 5, 8)
tight_splits = list(OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", filters=tight_filter))
assert len(tight_splits) == 1
[tight_task] = tight_splits[0]._file_scan_tasks
assert "ts_day=2026-05-08" in tight_task.file.file_path
23 changes: 10 additions & 13 deletions integrations/python/dataloader/tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,58 +356,55 @@ class TestDataFusionLiteralConversion:
def test_datetime_greater_than_or_equal(self):
dt = datetime(2026, 4, 27, tzinfo=UTC)
result = _to_datafusion_sql(col("datepartition") >= dt)
assert result == "\"datepartition\" >= CAST('2026-04-27 00:00:00.000000+0000' AS TIMESTAMP)"
assert result == "\"datepartition\" >= '2026-04-27T00:00:00+00:00'"

def test_datetime_equal(self):
dt = datetime(2026, 4, 27, 12, 30, 45, tzinfo=UTC)
result = _to_datafusion_sql(col("ts") == dt)
assert result == "\"ts\" = CAST('2026-04-27 12:30:45.000000+0000' AS TIMESTAMP)"
assert result == "\"ts\" = '2026-04-27T12:30:45+00:00'"

def test_datetime_with_microseconds(self):
dt = datetime(2026, 4, 27, 12, 30, 45, 123456, tzinfo=UTC)
result = _to_datafusion_sql(col("ts") == dt)
assert result == "\"ts\" = CAST('2026-04-27 12:30:45.123456+0000' AS TIMESTAMP)"
assert result == "\"ts\" = '2026-04-27T12:30:45.123456+00:00'"

def test_datetime_non_utc_timezone_preserved(self):
dt = datetime(2026, 4, 27, 12, 0, 0, tzinfo=timezone(timedelta(hours=5)))
result = _to_datafusion_sql(col("ts") >= dt)
assert result == "\"ts\" >= CAST('2026-04-27 12:00:00.000000+0500' AS TIMESTAMP)"
assert result == "\"ts\" >= '2026-04-27T12:00:00+05:00'"

def test_datetime_naive_no_offset(self):
dt = datetime(2026, 4, 27, 12, 0, 0)
result = _to_datafusion_sql(col("ts") >= dt)
assert result == "\"ts\" >= CAST('2026-04-27 12:00:00.000000' AS TIMESTAMP)"
assert result == "\"ts\" >= '2026-04-27T12:00:00'"

def test_date_greater_than_or_equal(self):
d = date(2026, 4, 27)
result = _to_datafusion_sql(col("datepartition") >= d)
assert result == "\"datepartition\" >= CAST('2026-04-27' AS DATE)"
assert result == "\"datepartition\" >= '2026-04-27'"

def test_datetime_between(self):
dt1 = datetime(2026, 4, 27, tzinfo=UTC)
dt2 = datetime(2026, 5, 1, tzinfo=UTC)
result = _to_datafusion_sql(col("ts").between(dt1, dt2))
assert result == (
"\"ts\" BETWEEN CAST('2026-04-27 00:00:00.000000+0000' AS TIMESTAMP)"
" AND CAST('2026-05-01 00:00:00.000000+0000' AS TIMESTAMP)"
)
assert result == "\"ts\" BETWEEN '2026-04-27T00:00:00+00:00' AND '2026-05-01T00:00:00+00:00'"

def test_datetime_in_compound_filter(self):
dt = datetime(2026, 4, 27, tzinfo=UTC)
f = (col("datepartition") >= dt) & (col("status") == "active")
result = _to_datafusion_sql(f)
assert "CAST('2026-04-27 00:00:00.000000+0000' AS TIMESTAMP)" in result
assert "'2026-04-27T00:00:00+00:00'" in result
assert "\"status\" = 'active'" in result

def test_time_equal(self):
t = time(14, 30, 0)
result = _to_datafusion_sql(col("event_time") == t)
assert result == "\"event_time\" = CAST('14:30:00.000000' AS TIME)"
assert result == "\"event_time\" = '14:30:00'"

def test_time_with_microseconds(self):
t = time(14, 30, 0, 500000)
result = _to_datafusion_sql(col("event_time") == t)
assert result == "\"event_time\" = CAST('14:30:00.500000' AS TIME)"
assert result == "\"event_time\" = '14:30:00.500000'"

def test_time_with_timezone_rejected(self):
t = time(14, 30, 0, tzinfo=timezone(timedelta(hours=5)))
Expand Down
25 changes: 25 additions & 0 deletions integrations/python/dataloader/tests/test_scan_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,31 @@ def test_comparison_types():
assert plan.row_filter == expected_filter, f"row_filter mismatch for: {where_clause}"


def test_datetime_string_literals_pushed_as_strings():
"""`filters._literal_to_sql()` emits plain string literals for datetime/date/time
(see PR #569 + follow-up). The scan optimizer treats them as ordinary string
literals; PyIceberg promotes them to typed literals during expression binding
against the table schema, restoring partition pruning.
"""
cases = [
(
"\"x\" >= '2026-05-02T00:00:00+00:00'",
GreaterThanOrEqual("x", "2026-05-02T00:00:00+00:00"),
),
(
"\"x\" < '2026-05-04T00:00:00'",
LessThan("x", "2026-05-04T00:00:00"),
),
(
"\"x\" = '2026-05-02'",
EqualTo("x", "2026-05-02"),
),
]
for where_clause, expected_filter in cases:
plan = optimize_scan(f'SELECT "a" FROM "db"."tbl" WHERE {where_clause}')
assert plan.row_filter == expected_filter, f"row_filter mismatch for: {where_clause}; got {plan.row_filter!r}"


def test_non_convertible_predicates_not_pushed():
"""Predicates with functions or column-vs-column are not pushed."""
cases = [
Expand Down
Loading