Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1f27f62
pydabs for job with task values and job with for each
lorenzorubi-db Nov 17, 2025
46422fe
adding conditional execution and file arrival scripts
zanitarahimi Nov 18, 2025
f841d1c
adding conditional execution and file arrival scripts
zanitarahimi Nov 18, 2025
0d789ab
pydabs programmatic generation -initial version
lorenzorubi-db Nov 21, 2025
e66e1b6
programmatic customization of pydabs
lorenzorubi-db Nov 24, 2025
d9b3b3b
typo
lorenzorubi-db Nov 24, 2025
6d7f66d
Merge branch 'databricks:main' into pydabs-patterns-airflow
lorenzorubi-db Dec 11, 2025
01e32ca
ruff reformatting
lorenzorubi-db Dec 11, 2025
236c688
update task value example
zanitarahimi Feb 13, 2026
54a2dfa
update file arrival trigger example
zanitarahimi Feb 13, 2026
02cad29
add table update trigger example
zanitarahimi Feb 13, 2026
545e1dd
update read me with new file names
zanitarahimi Feb 13, 2026
79304c3
update conditional execution example
zanitarahimi Feb 13, 2026
fdb1bf6
update check quality notebook
zanitarahimi Feb 13, 2026
66d3de8
update for each example
zanitarahimi Feb 13, 2026
c4cddde
add job backfill example
zanitarahimi Feb 13, 2026
eb68a4d
Merge pull request #1 from lorenzorubi-db/pydabs-patterns-airflow
zanitarahimi Feb 13, 2026
020e4f9
Merge branch 'databricks:main' into main
zanitarahimi Feb 13, 2026
285ffde
second last review
lorenzorubi-db Feb 15, 2026
8caf45d
update job file arrival
zanitarahimi Feb 16, 2026
19a9ef5
update the table update trigger example
zanitarahimi Feb 16, 2026
8e2e907
quick fix: typo in read me
zanitarahimi Feb 16, 2026
3bca825
update task values example
zanitarahimi Feb 16, 2026
47e1d1d
change ipynb to py scripts
zanitarahimi Feb 16, 2026
34a8b85
change ipynb to py scripts
zanitarahimi Feb 16, 2026
51e25ac
change ipynb to py scripts file arrival
zanitarahimi Feb 16, 2026
1584bfe
change ipynb to py scripts table update
zanitarahimi Feb 16, 2026
3af5209
change ipynb to py scripts for each
zanitarahimi Feb 16, 2026
5304fbc
rename for consistency
zanitarahimi Feb 16, 2026
7727917
update readme references for ipynb files
zanitarahimi Feb 16, 2026
6231d6f
ruff formatting
zanitarahimi Feb 16, 2026
a862d54
Merge remote-tracking branch 'upstream/main' into pydabs-patterns-air…
lorenzorubi-db Feb 16, 2026
f5a801f
Merge remote-tracking branch 'upstream/main' into pydabs-patterns-air…
lorenzorubi-db Feb 16, 2026
9f0ef8c
Merge branch 'pydabs-patterns-airflow' into quick_fix_file_arrival
lorenzorubi-db Feb 17, 2026
143f154
Merge pull request #3 from lorenzorubi-db/quick_fix_file_arrival
lorenzorubi-db Feb 17, 2026
c2bfa57
final review
lorenzorubi-db Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions knowledge_base/pydabs_job_backfill_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# pydabs_job_backfill_data

This example demonstrates a Databricks Asset Bundle (DABs) Job that runs a SQL task with a date parameter for backfilling data.

The Job consists of:

1. **run_daily_sql** — A SQL task that runs `src/my_query.sql` with a `run_date` job parameter. The query inserts data from a source table into a target table filtered by `event_date = run_date`, so you can backfill or reprocess specific dates.

* `src/`: SQL and notebook source code for this project.
* `src/my_query.sql`: Daily insert query that uses the `:run_date` parameter to filter by event date.
* `resources/`: Resource configurations (jobs, pipelines, etc.)
* `resources/backfill_data.py`: PyDABs job definition with a parameterized SQL task.

## Job parameters

| Parameter | Default | Description |
|------------|-------------|--------------------------------------|
| `run_date` | `2024-01-01` | Date used to filter data (e.g. `event_date`). |

Before deploying, set `warehouse_id` in `resources/backfill_data.py` to your SQL warehouse ID, and adjust the catalog/schema/table names in `src/my_query.sql` to match your environment.

## Getting started

Choose how you want to work on this project:

(a) Directly in your Databricks workspace, see
https://docs.databricks.com/dev-tools/bundles/workspace.

(b) Locally with an IDE like Cursor or VS Code, see
https://docs.databricks.com/vscode-ext.

(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html

If you're developing with an IDE, dependencies for this project should be installed using uv:

* Make sure you have the UV package manager installed.
It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/.
* Run `uv sync --dev` to install the project's dependencies.

## Using this project with the CLI

The Databricks workspace and IDE extensions provide a graphical interface for working
with this project. You can also use the CLI:

1. Authenticate to your Databricks workspace, if you have not done so already:
```
$ databricks configure
```

2. To deploy a development copy of this project, run:
```
$ databricks bundle deploy --target dev
```
(Note: "dev" is the default target, so `--target` is optional.)

This deploys everything defined for this project, including the job
`[dev yourname] sql_backfill_example`. You can find it under **Workflows** (or **Jobs & Pipelines**) in your workspace.

3. To run the job with the default `run_date`:
```
$ databricks bundle run sql_backfill_example
```

4. To run the job for a specific date (e.g. backfill):
```
$ databricks bundle run sql_backfill_example --parameters run_date=2024-02-01
```
21 changes: 21 additions & 0 deletions knowledge_base/pydabs_job_backfill_data/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This is a Databricks asset bundle definition for pydabs_airflow.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: pydabs_job_backfill_data

python:
venv_path: .venv
# Functions called to load resources defined in Python. See resources/__init__.py
resources:
- "resources:load_resources"

include:
- resources/*.yml
- resources/*/*.yml

targets:
dev:
mode: development
default: true
workspace:
host: https://myworkspace.databricks.com
26 changes: 26 additions & 0 deletions knowledge_base/pydabs_job_backfill_data/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[project]
name = "pydabs_job_backfill_data"
version = "0.0.1"
authors = [{ name = "Databricks Field Engineering" }]
requires-python = ">=3.10,<=3.13"
dependencies = [
# Any dependencies for jobs and pipelines in this project can be added here
# See also https://docs.databricks.com/dev-tools/bundles/library-dependencies
#
# LIMITATION: for pipelines, dependencies are cached during development;
# add dependencies to the 'environment' section of pipeline.yml file instead
]

[dependency-groups]
dev = [
"pytest",
"databricks-connect>=15.4,<15.5",
"databricks-bundles==0.275.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.black]
line-length = 125
16 changes: 16 additions & 0 deletions knowledge_base/pydabs_job_backfill_data/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from databricks.bundles.core import (
Bundle,
Resources,
load_resources_from_current_package_module,
)


def load_resources(bundle: Bundle) -> Resources:
"""
'load_resources' function is referenced in databricks.yml and is responsible for loading
bundle resources defined in Python code. This function is called by Databricks CLI during
bundle deployment. After deployment, this function is not used.
"""

# the default implementation loads all Python files in 'resources' directory
return load_resources_from_current_package_module()
24 changes: 24 additions & 0 deletions knowledge_base/pydabs_job_backfill_data/resources/backfill_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from databricks.bundles.jobs import (
Job,
Task,
SqlTask,
SqlTaskFile,
JobParameterDefinition,
)

run_daily_sql = Task(
task_key="run_daily_sql",
sql_task=SqlTask(
warehouse_id="<your_warehouse_id>",
file=SqlTaskFile(path="src/my_query.sql"),
parameters={"run_date": "{{job.parameters.run_date}}"},
),
)

job = Job(
name="sql_backfill_example",
tasks=[run_daily_sql],
parameters=[
JobParameterDefinition(name="run_date", default="2024-01-01"),
],
)
5 changes: 5 additions & 0 deletions knowledge_base/pydabs_job_backfill_data/src/my_query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- referenced by sql_task
INSERT INTO catalog.schema.target_table
SELECT *
FROM catalog.schema.source_table
WHERE event_date = date(:run_date);
64 changes: 64 additions & 0 deletions knowledge_base/pydabs_job_conditional_execution/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# pydabs_job_conditional_execution

This example demonstrates a Lakeflow Job that uses conditional task execution based on data quality checks.

The Lakeflow Job consists of following tasks:
1. Checks data quality and calculates bad records
2. Evaluates if bad records exceed a threshold (100 records)
3. Routes to different processing paths based on the condition:
- If bad records > 100: runs `fix_path` task
- If bad records ≤ 100: runs `skip_path` task

* `src/`: Notebook source code for this project.
* `src/check_quality.py`: Checks data quality and outputs bad record count
* `src/fix_path.py`: Handles cases with high bad record count
* `src/skip_path.py`: The skip path
* `resources/`: Resource configurations (jobs, pipelines, etc.)
* `resources/conditional_execution.py`: PyDABs job definition with conditional tasks


## Getting started

Choose how you want to work on this project:

(a) Directly in your Databricks workspace, see
https://docs.databricks.com/dev-tools/bundles/workspace.

(b) Locally with an IDE like Cursor or VS Code, see
https://docs.databricks.com/vscode-ext.

(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html

If you're developing with an IDE, dependencies for this project should be installed using uv:

* Make sure you have the UV package manager installed.
It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/.
* Run `uv sync --dev` to install the project's dependencies.


# Using this project using the CLI

The Databricks workspace and IDE extensions provide a graphical interface for working
with this project. It's also possible to interact with it directly using the CLI:

1. Authenticate to your Databricks workspace, if you have not done so already:
```
$ databricks configure
```

2. To deploy a development copy of this project, type:
```
$ databricks bundle deploy --target dev
```
(Note that "dev" is the default target, so the `--target` parameter
is optional here.)

This deploys everything that's defined for this project.
For example, this project will deploy a job called
`[dev yourname] conditional_execution_example` to your workspace.
You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**.

3. To run the job, use the "run" command:
```
$ databricks bundle run conditional_execution_example
```
21 changes: 21 additions & 0 deletions knowledge_base/pydabs_job_conditional_execution/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This is a Databricks asset bundle definition for pydabs_airflow.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: pydabs_job_conditional_execution

python:
venv_path: .venv
# Functions called to load resources defined in Python. See resources/__init__.py
resources:
- "resources:load_resources"

include:
- resources/*.yml
- resources/*/*.yml

targets:
dev:
mode: development
default: true
workspace:
host: https://myworkspace.databricks.com
26 changes: 26 additions & 0 deletions knowledge_base/pydabs_job_conditional_execution/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[project]
name = "pydabs_job_conditional_execution"
version = "0.0.1"
authors = [{ name = "Databricks Field Engineering" }]
requires-python = ">=3.10,<=3.13"
dependencies = [
# Any dependencies for jobs and pipelines in this project can be added here
# See also https://docs.databricks.com/dev-tools/bundles/library-dependencies
#
# LIMITATION: for pipelines, dependencies are cached during development;
# add dependencies to the 'environment' section of pipeline.yml file instead
]

[dependency-groups]
dev = [
"pytest",
"databricks-connect>=15.4,<15.5",
"databricks-bundles==0.275.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.black]
line-length = 125
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from databricks.bundles.core import (
Bundle,
Resources,
load_resources_from_current_package_module,
)


def load_resources(bundle: Bundle) -> Resources:
"""
'load_resources' function is referenced in databricks.yml and is responsible for loading
bundle resources defined in Python code. This function is called by Databricks CLI during
bundle deployment. After deployment, this function is not used.
"""

# the default implementation loads all Python files in 'resources' directory
return load_resources_from_current_package_module()
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from databricks.bundles.jobs import (
Job,
Task,
NotebookTask,
ConditionTask,
ConditionTaskOp,
TaskDependency,
)

# 1) Producer task: runs a notebook and emits a task value
check_quality = Task(
task_key="check_quality",
notebook_task=NotebookTask(notebook_path="src/branch/check_quality.py"),
)

# 2) Branch task: evaluates an expression using an upstream task value
branch = Task(
task_key="branch",
condition_task=ConditionTask(
left="{{tasks.check_quality.values.bad_records}}",
op=ConditionTaskOp.GREATER_THAN,
right="100",
),
depends_on=[TaskDependency(task_key="check_quality")],
)

# 3) Downstream tasks: gated on the condition outcome
fix_path = Task(
task_key="fix_path",
notebook_task=NotebookTask(notebook_path="src/branch/fix_path.py"),
depends_on=[TaskDependency(task_key="branch", outcome="true")],
)

skip_path = Task(
task_key="skip_path",
notebook_task=NotebookTask(notebook_path="src/branch/skip_path.py"),
depends_on=[TaskDependency(task_key="branch", outcome="false")],
)

job = Job(
name="conditional_execution_example",
tasks=[check_quality, branch, fix_path, skip_path],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Databricks notebook source
from databricks.sdk.runtime import dbutils

bad_records = 123 # result of a data quality check
dbutils.jobs.taskValues.set(key="bad_records", value=bad_records)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Databricks notebook source
print("Processing Files...")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Databricks notebook source
print("Skipping Files...")
Loading