Skip to content

Commit c1ce829

Browse files
authored
Feat: support Trino Delta Lake connector (#2330)
* Add trino delta lake connector support * Clarify timestamp conversion explanation * PR feedback
1 parent 68ea9f6 commit c1ce829

File tree

11 files changed

+220
-18
lines changed

11 files changed

+220
-18
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,4 @@ spark-pyspark-test:
179179
pytest -n auto -m "spark_pyspark"
180180

181181
trino-test:
182-
pytest -n auto -m "trino or trino_iceberg"
182+
pytest -n auto -m "trino or trino_iceberg or trino_delta"

docs/integrations/engines/trino.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pip install "trino[external-authentication-token-cache]"
1717

1818
### Trino Connector Support
1919

20-
The trino engine adapter has been tested against the [Hive Connector](https://trino.io/docs/current/connector/hive.html) and the [Iceberg Connector](https://trino.io/docs/current/connector/iceberg.html).
20+
The trino engine adapter has been tested against the [Hive Connector](https://trino.io/docs/current/connector/hive.html), [Iceberg Connector](https://trino.io/docs/current/connector/iceberg.html), and [Delta Lake Connector](https://trino.io/docs/current/connector/delta-lake.html).
2121

2222
Please let us know on [Slack](https://tobikodata.com/slack) if you are wanting to use another connector or have tried another connector.
2323

@@ -49,6 +49,17 @@ iceberg.catalog.type=hive_metastore
4949

5050
The `jdbc`, `rest` and `nessie` catalogs do not support views and are thus incompatible with SQLMesh.
5151

52+
#### Delta Lake Connector Configuration
53+
54+
The Trino adapter Delta Lake connector has only been tested with the Hive metastore catalog type.
55+
56+
The [properties file](https://trino.io/docs/current/connector/delta-lake.html#general-configuration) must include the Hive metastore URI and catalog name in addition to any other [general properties](https://trino.io/docs/current/object-storage/metastores.html#general-metastore-properties).
57+
58+
``` properties linenums="1"
59+
hive.metastore.uri=thrift://example.net:9083
60+
delta.hive-catalog-name=datalake_delta # example catalog name, can be any valid string
61+
```
62+
5263
### Connection options
5364

5465
| Option | Description | Type | Required |

pytest.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ markers =
3131
spark: test for Spark
3232
trino: test for Trino (Hive connector)
3333
trino_iceberg: test for Trino (Iceberg connector)
34+
trino_delta: test for Trino (Delta connector)
3435
addopts = -n 0 --dist=loadgroup
3536

3637
# Set this to True to enable logging during tests

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ def _build_view_properties_exp(
224224
def _truncate_comment(
225225
self, comment: str, length: t.Optional[int], escape_backslash: bool = False
226226
) -> str:
227-
# iceberg does not have a comment length limit
228-
if self.current_catalog_type == "iceberg":
227+
# iceberg and delta do not have a comment length limit
228+
if self.current_catalog_type in ("iceberg", "delta"):
229229
return comment
230230
return super()._truncate_comment(comment, length, escape_backslash)
231231

sqlmesh/core/engine_adapter/trino.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@
2424
SourceQuery,
2525
set_catalog,
2626
)
27+
from sqlmesh.utils.date import TimeLike
2728

2829
if t.TYPE_CHECKING:
2930
from trino.dbapi import Connection as TrinoConnection
3031

3132
from sqlmesh.core._typing import SchemaName, TableName
32-
from sqlmesh.core.engine_adapter._typing import DF
33+
from sqlmesh.core.engine_adapter._typing import DF, QueryOrDF
3334

3435

3536
@set_catalog()
@@ -183,3 +184,80 @@ def _df_to_source_queries(
183184
df[column] = pd.to_datetime(df[column]).map(lambda x: x.isoformat(" ")) # type: ignore
184185

185186
return super()._df_to_source_queries(df, columns_to_types, batch_size, target_table)
187+
188+
def _build_schema_exp(
189+
self,
190+
table: exp.Table,
191+
columns_to_types: t.Dict[str, exp.DataType],
192+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
193+
expressions: t.Optional[t.List[exp.PrimaryKey]] = None,
194+
is_view: bool = False,
195+
) -> exp.Schema:
196+
if self.current_catalog_type == "delta_lake":
197+
columns_to_types = self._to_delta_ts(columns_to_types)
198+
199+
return super()._build_schema_exp(
200+
table, columns_to_types, column_descriptions, expressions, is_view
201+
)
202+
203+
def _scd_type_2(
204+
self,
205+
target_table: TableName,
206+
source_table: QueryOrDF,
207+
unique_key: t.Sequence[exp.Expression],
208+
valid_from_name: str,
209+
valid_to_name: str,
210+
execution_time: TimeLike,
211+
invalidate_hard_deletes: bool = True,
212+
updated_at_name: t.Optional[str] = None,
213+
check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Column]]] = None,
214+
updated_at_as_valid_from: bool = False,
215+
execution_time_as_valid_from: bool = False,
216+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
217+
table_description: t.Optional[str] = None,
218+
column_descriptions: t.Optional[t.Dict[str, str]] = None,
219+
) -> None:
220+
if columns_to_types and self.current_catalog_type == "delta_lake":
221+
columns_to_types = self._to_delta_ts(columns_to_types)
222+
223+
return super()._scd_type_2(
224+
target_table,
225+
source_table,
226+
unique_key,
227+
valid_from_name,
228+
valid_to_name,
229+
execution_time,
230+
invalidate_hard_deletes,
231+
updated_at_name,
232+
check_columns,
233+
updated_at_as_valid_from,
234+
execution_time_as_valid_from,
235+
columns_to_types,
236+
table_description,
237+
column_descriptions,
238+
)
239+
240+
# delta_lake only supports two timestamp data types. This method converts other
241+
# timestamp types to those two for use in DDL statements. Trino/delta automatically
242+
# converts the data values to the correct type on write, so we only need to handle
243+
# the column types in DDL.
244+
# - `timestamp(6)` for non-timezone-aware
245+
# - `timestamp(3) with time zone` for timezone-aware
246+
# https://trino.io/docs/current/connector/delta-lake.html#delta-lake-to-trino-type-mapping
247+
def _to_delta_ts(
248+
self, columns_to_types: t.Dict[str, exp.DataType]
249+
) -> t.Dict[str, exp.DataType]:
250+
ts6 = exp.DataType.build("timestamp(6)")
251+
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
252+
253+
delta_columns_to_types = {
254+
k: ts6 if v.is_type(exp.DataType.Type.TIMESTAMP) else v
255+
for k, v in columns_to_types.items()
256+
}
257+
258+
delta_columns_to_types = {
259+
k: ts3_tz if v.is_type(exp.DataType.Type.TIMESTAMPTZ) else v
260+
for k, v in delta_columns_to_types.items()
261+
}
262+
263+
return delta_columns_to_types

tests/core/engine_adapter/config.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@ gateways:
3030
register_comments: true
3131
state_connection:
3232
type: duckdb
33+
inttest_trino_delta:
34+
connection:
35+
type: trino
36+
host: localhost
37+
port: 8080
38+
user: admin
39+
catalog: datalake_delta
40+
http_scheme: http
41+
retries: 20
42+
register_comments: true
43+
state_connection:
44+
type: duckdb
3345
inttest_spark:
3446
connection:
3547
type: spark

tests/core/engine_adapter/docker-compose.yaml

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ services:
6060
volumes:
6161
- ./trino/initdb.sql:/docker-entrypoint-initdb.d/initdb.sql
6262

63+
# A second metastore DB is needed because testing all of hive/iceberg/delta
64+
# creates too many connections for a single postgres DB.
65+
trino_iceberg_delta_metastore_db:
66+
image: postgres
67+
hostname: trino_iceberg_delta_metastore_db
68+
environment:
69+
POSTGRES_USER: hive
70+
POSTGRES_PASSWORD: hive
71+
volumes:
72+
- ./trino/initdb.sql:/docker-entrypoint-initdb.d/initdb.sql
73+
6374
trino-datalake-hive-metastore:
6475
hostname: trino-datalake-hive-metastore
6576
image: 'starburstdata/hive:3.1.2-e.15'
@@ -91,13 +102,26 @@ services:
91102
image: 'starburstdata/hive:3.1.2-e.15'
92103
environment:
93104
HIVE_METASTORE_DRIVER: org.postgresql.Driver
94-
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://trino_metastore_db:5432/datalake_iceberg_metastore
105+
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://trino_iceberg_delta_metastore_db:5432/datalake_iceberg_metastore
95106
HIVE_METASTORE_USER: hive
96107
HIVE_METASTORE_PASSWORD: hive
97108
HIVE_METASTORE_WAREHOUSE_DIR: s3://trino/datalake_iceberg
98109
<<: *hive_metastore_environments
99110
depends_on:
100-
- trino_metastore_db
111+
- trino_iceberg_delta_metastore_db
112+
113+
trino-datalake-delta-hive-metastore:
114+
hostname: trino-datalake-delta-hive-metastore
115+
image: 'starburstdata/hive:3.1.2-e.15'
116+
environment:
117+
HIVE_METASTORE_DRIVER: org.postgresql.Driver
118+
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://trino_iceberg_delta_metastore_db:5432/datalake_delta_metastore
119+
HIVE_METASTORE_USER: hive
120+
HIVE_METASTORE_PASSWORD: hive
121+
HIVE_METASTORE_WAREHOUSE_DIR: s3://trino/datalake_delta
122+
<<: *hive_metastore_environments
123+
depends_on:
124+
- trino_iceberg_delta_metastore_db
101125

102126
# Spark Stack
103127
spark:
@@ -154,8 +178,10 @@ services:
154178
/usr/bin/mc config --quiet host add myminio http://minio:9000 minio minio123;
155179
/usr/bin/mc mb --quiet myminio/trino/datalake;
156180
/usr/bin/mc mb --quiet myminio/trino/datalake_iceberg;
181+
/usr/bin/mc mb --quiet myminio/trino/datalake_delta;
157182
/usr/bin/mc mb --quiet myminio/trino/testing;
158183
/usr/bin/mc mb --quiet myminio/trino/testing_iceberg;
184+
/usr/bin/mc mb --quiet myminio/trino/testing_delta;
159185
/usr/bin/mc mb --quiet myminio/spark/datalake;
160186
/usr/bin/mc mb --quiet myminio/spark/testing
161187
"

tests/core/engine_adapter/test_integration.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,15 @@ def config() -> Config:
449449
pytest.mark.xdist_group("engine_integration_trino_iceberg"),
450450
],
451451
),
452+
pytest.param(
453+
"trino_delta",
454+
marks=[
455+
pytest.mark.docker,
456+
pytest.mark.engine,
457+
pytest.mark.trino_delta,
458+
pytest.mark.xdist_group("engine_integration_trino_delta"),
459+
],
460+
),
452461
pytest.param(
453462
"spark",
454463
marks=[

0 commit comments

Comments
 (0)