Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
java-version: 17
distribution: "zulu"
- name: Install the project
run: uv sync --locked --extra dev_local
run: uv sync --locked --extra dev
- name: Run code checks
run: uv run ruff check
- name: Check code formatting
Expand Down Expand Up @@ -65,6 +65,10 @@ jobs:
version: 0.260.0
- name: Install the project
run: uv sync --locked --extra dev
- name: Install Databricks Connect
run: |
uv pip uninstall pyspark
uv pip install databricks-connect==16.3.5
- name: Check Databricks CLI
run: databricks current-user me
- name: Run tests
Expand Down
50 changes: 28 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@

This project is an example implementation of a [Databricks Asset Bundle](https://docs.databricks.com/aws/en/dev-tools/bundles/) using a [Databricks Free Edition](https://www.databricks.com/learn/free-edition) workspace.

The project ist configured using `pyproject.toml` (Python specifics) and `databricks.yaml` (Databricks Bundle specifics) and uses [uv](https://docs.astral.sh/uv/) to manage the Python project and dependencies.
The project is configured using `pyproject.toml` (Python specifics) and `databricks.yaml` (Databricks Bundle specifics) and uses [uv](https://docs.astral.sh/uv/) to manage the Python project and dependencies.

## Repo Overview
## Repository Structure

* `.github/workflows`: CI/CD jobs to test and dpeloy bundle
* `dab_project`: Python project (Used in Databricks Workflow as Python-Wheel-Task)
* `dbt`: [dbt](https://github.com/dbt-labs/dbt-core) project (Used in Databricks Workflow as dbt-Task)
* dbt-Models used from https://github.com/dbt-labs/jaffle_shop_duckdb
* `resources`: Resources such as Databricks Workflows or Databricks Volumes/Schemas
* Python-based workflow: https://docs.databricks.com/aws/en/dev-tools/bundles/python
* YAML-based Workflow: https://docs.databricks.com/aws/en/dev-tools/bundles/resources#job
* `scripts`: Python script to setup groups, service principals and catalogs used in a Databricks (Free Edition) workspace
* `tests`: Unit-tests running on Databricks (via Connect) or locally
* Used in [ci.yml](.github/workflows/ci.yml) jobs
| Directory | Description |
|-----------|-------------|
| `.github/workflows` | CI/CD jobs to test and deploy bundle |
| `dab_project` | Python project (Used in Databricks Workflow as Python-Wheel-Task) |
| `dbt` | [dbt](https://github.com/dbt-labs/dbt-core) project<br/>* Used in Databricks Workflow as dbt-Task<br/>* dbt-Models used from https://github.com/dbt-labs/jaffle_shop_duckdb |
| `resources` | Resources such as Databricks Workflows or Databricks Volumes/Schemas<br/>* Python-based workflow: https://docs.databricks.com/aws/en/dev-tools/bundles/python<br/>* YAML-based Workflow: https://docs.databricks.com/aws/en/dev-tools/bundles/resources#job |
| `scripts` | Python script to setup groups, service principals and catalogs used in a Databricks (Free Edition) workspace |
| `tests` | Unit-tests running on Databricks (via Connect) or locally<br/>* Used in [ci.yml](.github/workflows/ci.yml) jobs |

## Databricks Workspace

Expand Down Expand Up @@ -52,7 +50,7 @@ Sync entire `uv` environment with dev dependencies:
uv sync --extra dev
```

> **Note:** `dev` uses Databricks Connect, while `dev_local` uses local Spark
> **Note:** we install Databricks Connect in a follow-up step

#### (Optional) Activate virtual environment

Expand All @@ -66,30 +64,38 @@ Windows:
.venv\Scripts\activate
```

### Databricks Connect

Install `databricks-connect` in active environment. This requires authentication being set up via Databricks CLI.

```bash
uv pip uninstall pyspark
uv pip install databricks-connect==16.3.5
```
> **Note:** For Databricks Runtime 16.3

See https://docs.databricks.com/aws/en/dev-tools/vscode-ext/ for using Databricks Connect extension in VS Code.

### Unit-Tests

```bash
uv run pytest -v
```

Based on whether Databricks Connect (the `dev` default) is enabled or not the Unit-Tests try to use a Databricks Cluster or start a local Spark session with Delta support.
* On Databricks the unit-tests currently assume the catalog `unit_tests` exists (not ideal).
Based on whether Databricks Connect is enabled or not the Unit-Tests try to use a Databricks Cluster or start a local Spark session with Delta support.
* On Databricks the unit-tests currently assume the catalog `lake_dev` exists.

> **Note:** For local Spark Java is required. On Windows Spark/Delta requires HADOOP libraries and generally does not run well, opt for `wsl` instead.

### Checks

```bash
# Linting
ruff check --fix
uv run ruff check --fix
# Formatting
ruff format
uv run ruff format
```

### Databricks Connect

See https://docs.databricks.com/aws/en/dev-tools/vscode-ext/ for using Databricks Connect extension in VS Code.

### Setup Databricks Workspace

The following script sets up a Databricks (Free Edition) Workspace for this project with additional catalogs, groups and service principals. It uses both Databricks-SDK and Databricks Connect (Serverless).
Expand Down Expand Up @@ -150,7 +156,7 @@ uv run ./scripts/setup_workspace.py
The `dbt` project is based on https://github.com/dbt-labs/jaffle_shop_duckdb with following changes:

* Schema bronze, silver, gold
* document materialization `use_materialization_v2`
* documented materialization `use_materialization_v2`
* Primary, Foreign Key Constraints

## TODO:
Expand Down
16 changes: 1 addition & 15 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,7 @@ dependencies = [

[project.optional-dependencies]
dev = [
# Databricks Runtime (connect includes delta/pyspark)
"databricks-connect~=16.3.0",
"pydantic==2.8.2",
# dbt
"dbt-databricks~=1.10.0",
# Tooling
"databricks-bundles~=0.260.0", # For Python-based Workflows
"mypy", # Type hints
"pip", # Databricks extension needs it
"pytest", # Unit testing
"ruff", # Linting/Formatting
]
# Is this really needed?
dev_local = [
# Databricks Runtime (connect includes delta/pyspark)
# Runtime
"delta-spark>=3.3.0, <4.0.0",
"pydantic==2.8.2",
"pyspark>=3.5.0, <4.0.0",
Expand Down
3 changes: 2 additions & 1 deletion resources/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class Variables:
DEFAULT_ENVIRONMENT = JobEnvironment(
environment_key="default",
spec=Environment(
environment_version=Variables.serverless_environment_version, dependencies=["./dist/*.whl"]
environment_version=Variables.serverless_environment_version,
dependencies=["./dist/dab_project*.whl"],
),
)

Expand Down
36 changes: 31 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import shutil
import tempfile
import uuid
from pathlib import Path
from typing import Generator, Optional

Expand All @@ -23,7 +24,7 @@ def spark() -> Generator[SparkSession, None, None]:
yield spark
else:
# If databricks-connect is not installed, we use use local Spark session
warehouse_dir = tempfile.TemporaryDirectory().name
warehouse_dir = tempfile.mkdtemp()
_builder = (
SparkSession.builder.master("local[*]")
.config("spark.hive.metastore.warehouse.dir", Path(warehouse_dir).as_uri())
Expand All @@ -46,13 +47,38 @@ def spark() -> Generator[SparkSession, None, None]:


@pytest.fixture(scope="session")
def catalog_name() -> Generator[Optional[str], None, None]:
def catalog_name() -> Optional[str]:
"""Fixture to provide the catalog name for tests.

In Databricks, we use the "unit_tests" catalog.
In Databricks, we use the "lake_dev" catalog.
Locally we run without a catalog, so we return None.
"""
if DATABRICKS_CONNECT_AVAILABLE:
yield "unit_tests"
return "lake_dev"
else:
yield None
return None


@pytest.fixture(scope="module")
def create_schema(spark, catalog_name, request) -> Generator[str, None, None]:
"""Fixture to provide a schema for tests.

Creates a schema with a random name prefixed with the test module name and cleans it up after tests.
"""
module_name = request.module.__name__.split(".")[-1] # Get just the module name without path
schema_name = f"pytest_{module_name}_{uuid.uuid4().hex[:8]}"

if catalog_name is not None:
full_schema_name = f"{catalog_name}.{schema_name}"
else:
full_schema_name = schema_name

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {full_schema_name}")
yield schema_name
spark.sql(f"DROP SCHEMA IF EXISTS {full_schema_name} CASCADE")


@pytest.fixture(scope="function")
def table_name(request) -> str:
"""Fixture to provide a table name based on the test function name."""
return request.node.name
10 changes: 5 additions & 5 deletions tests/test_base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ def _perform_task(self, catalog_name: str) -> None:
return Task.create_task_factory("TestTask")


def test_etl_task_run(spark, catalog_name, request):
def test_etl_task_run(spark, catalog_name, create_schema, table_name):
task = generate_test_task(
schema_name=__name__,
table_name=f"table_{request.node.name}",
schema_name=create_schema,
table_name=table_name,
)
task.run(catalog_name)

# Verify that the data was written to the Delta table
delta_table = DeltaWorker(
catalog_name=catalog_name,
schema_name=__name__,
table_name=f"table_{request.node.name}",
schema_name=create_schema,
table_name=table_name,
)

assert task.get_class_name() == "TestTask"
Expand Down
6 changes: 3 additions & 3 deletions tests/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# spark.sql(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")


def test_deltawriter_create_table_if_not_exists(spark, catalog_name, request):
def test_deltawriter_create_table_if_not_exists(spark, catalog_name, create_schema, table_name):
schema = T.StructType(
[
T.StructField("key", T.IntegerType()),
Expand All @@ -22,8 +22,8 @@ def test_deltawriter_create_table_if_not_exists(spark, catalog_name, request):
)
delta_writer = DeltaWorker(
catalog_name=catalog_name,
schema_name=__name__,
table_name=f"table_{request.node.name}",
schema_name=create_schema,
table_name=table_name,
)

delta_writer.drop_table_if_exists()
Expand Down
Loading