Skip to content

Commit 35d430c

Browse files
authored
Feat: use 1 thread for dbt duckdb targets (#2222)
* Use 1 thread for dbt duckdb targets * Add test
1 parent bb10366 commit 35d430c

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

sqlmesh/dbt/target.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import abc
4+
import logging
45
import sys
56
import typing as t
67
from pathlib import Path
@@ -41,6 +42,8 @@
4142
else:
4243
from typing_extensions import Literal
4344

45+
logger = logging.getLogger(__name__)
46+
4447
IncrementalKind = t.Union[
4548
t.Type[IncrementalByUniqueKeyKind],
4649
t.Type[IncrementalByTimeRangeKind],
@@ -171,6 +174,8 @@ def validate_authentication(
171174
if path is None or path == DUCKDB_IN_MEMORY
172175
else Path(t.cast(str, path)).stem
173176
)
177+
if "threads" in values and t.cast(int, values["threads"]) > 1:
178+
logger.warning("DuckDB does not support concurrency - setting threads to 1.")
174179
return values
175180

176181
def default_incremental_strategy(self, kind: IncrementalKind) -> str:
@@ -190,7 +195,7 @@ def to_sqlmesh(self) -> ConnectionConfig:
190195
kwargs["connector_config"] = self.settings
191196
return DuckDBConnectionConfig(
192197
database=self.path,
193-
concurrent_tasks=self.threads,
198+
concurrent_tasks=1,
194199
**kwargs,
195200
)
196201

tests/dbt/test_config.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import typing as t
33
from pathlib import Path
4+
from shutil import copytree
45
from unittest.mock import PropertyMock
56

67
import pytest
@@ -12,6 +13,7 @@
1213
from sqlmesh.core.model import SqlModel
1314
from sqlmesh.dbt.common import Dependencies
1415
from sqlmesh.dbt.context import DbtContext
16+
from sqlmesh.dbt.loader import sqlmesh_config
1517
from sqlmesh.dbt.model import IncrementalByUniqueKeyKind, Materialization, ModelConfig
1618
from sqlmesh.dbt.project import Project
1719
from sqlmesh.dbt.source import SourceConfig
@@ -379,6 +381,29 @@ def _test_warehouse_config(
379381
return config
380382

381383

384+
def test_duckdb_threads(tmp_path):
385+
dbt_project_dir = "tests/fixtures/dbt/sushi_test"
386+
temp_dir = tmp_path / "sushi_test"
387+
388+
copytree(dbt_project_dir, temp_dir, symlinks=True)
389+
390+
with open(temp_dir / "profiles.yml", "w") as f:
391+
f.write(
392+
"""
393+
sushi:
394+
outputs:
395+
in_memory:
396+
type: duckdb
397+
schema: sushi
398+
threads: 4
399+
target: in_memory
400+
"""
401+
)
402+
403+
config = sqlmesh_config(temp_dir)
404+
assert config.gateways["in_memory"].connection.concurrent_tasks == 1
405+
406+
382407
def test_snowflake_config():
383408
_test_warehouse_config(
384409
"""

0 commit comments

Comments
 (0)