diff --git a/knowledge_base/pydabs_job_backfill_data/README.md b/knowledge_base/pydabs_job_backfill_data/README.md new file mode 100644 index 0000000..0bdbf89 --- /dev/null +++ b/knowledge_base/pydabs_job_backfill_data/README.md @@ -0,0 +1,67 @@ +# pydabs_job_backfill_data + +This example demonstrates a Databricks Asset Bundle (DABs) Job that runs a SQL task with a date parameter for backfilling data. + +The Job consists of: + +1. **run_daily_sql** — A SQL task that runs `src/my_query.sql` with a `run_date` job parameter. The query inserts data from a source table into a target table filtered by `event_date = run_date`, so you can backfill or reprocess specific dates. + +* `src/`: SQL and notebook source code for this project. + * `src/my_query.sql`: Daily insert query that uses the `:run_date` parameter to filter by event date. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/backfill_data.py`: PyDABs job definition with a parameterized SQL task. + +## Job parameters + +| Parameter | Default | Description | +|------------|-------------|--------------------------------------| +| `run_date` | `2024-01-01` | Date used to filter data (e.g. `event_date`). | + +Before deploying, set `warehouse_id` in `resources/backfill_data.py` to your SQL warehouse ID, and adjust the catalog/schema/table names in `src/my_query.sql` to match your environment. + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + +## Using this project with the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. You can also use the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, run: + ``` + $ databricks bundle deploy --target dev + ``` + (Note: "dev" is the default target, so `--target` is optional.) + + This deploys everything defined for this project, including the job + `[dev yourname] sql_backfill_example`. You can find it under **Workflows** (or **Jobs & Pipelines**) in your workspace. + +3. To run the job with the default `run_date`: + ``` + $ databricks bundle run sql_backfill_example + ``` + +4. To run the job for a specific date (e.g. backfill): + ``` + $ databricks bundle run sql_backfill_example --parameters run_date=2024-02-01 + ``` diff --git a/knowledge_base/pydabs_job_backfill_data/databricks.yml b/knowledge_base/pydabs_job_backfill_data/databricks.yml new file mode 100644 index 0000000..f5317ff --- /dev/null +++ b/knowledge_base/pydabs_job_backfill_data/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_backfill_data + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_backfill_data/pyproject.toml b/knowledge_base/pydabs_job_backfill_data/pyproject.toml new file mode 100644 index 0000000..15f1f3e --- /dev/null +++ b/knowledge_base/pydabs_job_backfill_data/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_backfill_data" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_backfill_data/resources/__init__.py b/knowledge_base/pydabs_job_backfill_data/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_backfill_data/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_backfill_data/resources/backfill_data.py b/knowledge_base/pydabs_job_backfill_data/resources/backfill_data.py new file mode 100644 index 0000000..2526a12 --- /dev/null +++ b/knowledge_base/pydabs_job_backfill_data/resources/backfill_data.py @@ -0,0 +1,24 @@ +from databricks.bundles.jobs import ( + Job, + Task, + SqlTask, + SqlTaskFile, + JobParameterDefinition, +) + +run_daily_sql = Task( + task_key="run_daily_sql", + sql_task=SqlTask( + warehouse_id="", + file=SqlTaskFile(path="src/my_query.sql"), + parameters={"run_date": "{{job.parameters.run_date}}"}, + ), +) + +job = Job( + name="sql_backfill_example", + tasks=[run_daily_sql], + parameters=[ + JobParameterDefinition(name="run_date", default="2024-01-01"), + ], +) diff --git a/knowledge_base/pydabs_job_backfill_data/src/my_query.sql b/knowledge_base/pydabs_job_backfill_data/src/my_query.sql new file mode 100644 index 0000000..a71393e --- /dev/null +++ b/knowledge_base/pydabs_job_backfill_data/src/my_query.sql @@ -0,0 +1,5 @@ +-- referenced by sql_task +INSERT INTO catalog.schema.target_table +SELECT * +FROM catalog.schema.source_table +WHERE event_date = date(:run_date); \ No newline at end of file diff --git a/knowledge_base/pydabs_job_conditional_execution/README.md b/knowledge_base/pydabs_job_conditional_execution/README.md new file mode 100644 index 0000000..c73e613 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/README.md @@ -0,0 +1,64 @@ +# pydabs_job_conditional_execution + +This example demonstrates a Lakeflow Job that uses conditional task execution based on data quality checks. + +The Lakeflow Job consists of following tasks: +1. Checks data quality and calculates bad records +2. Evaluates if bad records exceed a threshold (100 records) +3. Routes to different processing paths based on the condition: + - If bad records > 100: runs `fix_path` task + - If bad records ≤ 100: runs `skip_path` task + +* `src/`: Notebook source code for this project. + * `src/check_quality.py`: Checks data quality and outputs bad record count + * `src/fix_path.py`: Handles cases with high bad record count + * `src/skip_path.py`: The skip path +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/conditional_execution.py`: PyDABs job definition with conditional tasks + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, this project will deploy a job called + `[dev yourname] conditional_execution_example` to your workspace. + You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**. + +3. To run the job, use the "run" command: + ``` + $ databricks bundle run conditional_execution_example + ``` \ No newline at end of file diff --git a/knowledge_base/pydabs_job_conditional_execution/databricks.yml b/knowledge_base/pydabs_job_conditional_execution/databricks.yml new file mode 100644 index 0000000..55aa093 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_conditional_execution + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_conditional_execution/pyproject.toml b/knowledge_base/pydabs_job_conditional_execution/pyproject.toml new file mode 100644 index 0000000..44bbd65 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_conditional_execution" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py b/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py new file mode 100644 index 0000000..4598fe3 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/resources/conditional_execution.py @@ -0,0 +1,43 @@ +from databricks.bundles.jobs import ( + Job, + Task, + NotebookTask, + ConditionTask, + ConditionTaskOp, + TaskDependency, +) + +# 1) Producer task: runs a notebook and emits a task value +check_quality = Task( + task_key="check_quality", + notebook_task=NotebookTask(notebook_path="src/branch/check_quality.py"), +) + +# 2) Branch task: evaluates an expression using an upstream task value +branch = Task( + task_key="branch", + condition_task=ConditionTask( + left="{{tasks.check_quality.values.bad_records}}", + op=ConditionTaskOp.GREATER_THAN, + right="100", + ), + depends_on=[TaskDependency(task_key="check_quality")], +) + +# 3) Downstream tasks: gated on the condition outcome +fix_path = Task( + task_key="fix_path", + notebook_task=NotebookTask(notebook_path="src/branch/fix_path.py"), + depends_on=[TaskDependency(task_key="branch", outcome="true")], +) + +skip_path = Task( + task_key="skip_path", + notebook_task=NotebookTask(notebook_path="src/branch/skip_path.py"), + depends_on=[TaskDependency(task_key="branch", outcome="false")], +) + +job = Job( + name="conditional_execution_example", + tasks=[check_quality, branch, fix_path, skip_path], +) diff --git a/knowledge_base/pydabs_job_conditional_execution/src/branch/check_quality.py b/knowledge_base/pydabs_job_conditional_execution/src/branch/check_quality.py new file mode 100644 index 0000000..f583e5f --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/branch/check_quality.py @@ -0,0 +1,5 @@ +# Databricks notebook source +from databricks.sdk.runtime import dbutils + +bad_records = 123 # result of a data quality check +dbutils.jobs.taskValues.set(key="bad_records", value=bad_records) diff --git a/knowledge_base/pydabs_job_conditional_execution/src/branch/fix_path.py b/knowledge_base/pydabs_job_conditional_execution/src/branch/fix_path.py new file mode 100644 index 0000000..46c7e62 --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/branch/fix_path.py @@ -0,0 +1,2 @@ +# Databricks notebook source +print("Processing Files...") diff --git a/knowledge_base/pydabs_job_conditional_execution/src/branch/skip_path.py b/knowledge_base/pydabs_job_conditional_execution/src/branch/skip_path.py new file mode 100644 index 0000000..d63152d --- /dev/null +++ b/knowledge_base/pydabs_job_conditional_execution/src/branch/skip_path.py @@ -0,0 +1,2 @@ +# Databricks notebook source +print("Skipping Files...") diff --git a/knowledge_base/pydabs_job_file_arrival/README.md b/knowledge_base/pydabs_job_file_arrival/README.md new file mode 100644 index 0000000..da1fdc4 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/README.md @@ -0,0 +1,67 @@ +# pydabs_job_file_arrival + +This example demonstrates a Lakeflow Job that uses file arrival triggers to automatically process new files when they arrive in a Unity Catalog Volume. + +The Lakeflow Job is configured with: +- **File arrival trigger**: Monitors a Unity Catalog Volume (root or subpath) for new files, recursively. +- **Configurable wait times**: + - Minimum time between triggers: 60 seconds + - Wait after last file change: 90 seconds (ensures file write is complete) +- **Automatic processing**: When files are detected, the job automatically runs and processes them + +* `src/`: Notebook source code for this project. + * `src/files/process_files.py`: Processes newly arrived files from the volume path. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/file_arrival.py`: PyDABs job with file arrival trigger configuration. + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, this project will deploy a job called + `[dev yourname] file_arrival_example` to your workspace. + You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**. + +3. Development vs. Production behavior + - Dev target (mode: development): Schedules and automatic triggers are disabled by design, so the job will not auto-fire on file arrival. Use manual runs to test the logic. + You can also manually run it with: + + ``` + $ databricks bundle run file_arrival_example + ``` + - Prod target (mode: production): Automatic triggers are active. Uploading a file to the configured Unity Catalog Volume path will trigger the job run when the trigger evaluates. + \ No newline at end of file diff --git a/knowledge_base/pydabs_job_file_arrival/databricks.yml b/knowledge_base/pydabs_job_file_arrival/databricks.yml new file mode 100644 index 0000000..80dff3c --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_file_arrival + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_file_arrival/pyproject.toml b/knowledge_base/pydabs_job_file_arrival/pyproject.toml new file mode 100644 index 0000000..cf126d2 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_file_arrival" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_file_arrival/resources/__init__.py b/knowledge_base/pydabs_job_file_arrival/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py new file mode 100644 index 0000000..c67f392 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/resources/file_arrival.py @@ -0,0 +1,24 @@ +from databricks.bundles.jobs import ( + FileArrivalTriggerConfiguration, + Job, + Task, + TriggerSettings, + NotebookTask, +) + +process_files = Task( + task_key="process_files", + notebook_task=NotebookTask(notebook_path="src/files/process_files.py"), +) + +job = Job( + name="file_arrival_example", + trigger=TriggerSettings( + file_arrival=FileArrivalTriggerConfiguration( + url="/Volumes/main/raw/incoming/", # UC volume or external location + min_time_between_triggers_seconds=60, + wait_after_last_change_seconds=90, + ) + ), + tasks=[process_files], +) diff --git a/knowledge_base/pydabs_job_file_arrival/src/files/process_files.py b/knowledge_base/pydabs_job_file_arrival/src/files/process_files.py new file mode 100644 index 0000000..d657db1 --- /dev/null +++ b/knowledge_base/pydabs_job_file_arrival/src/files/process_files.py @@ -0,0 +1,9 @@ +# Databricks notebook source +from pyspark.sql import functions as F + +df = ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "csv") + .option("cloudFiles.schemaLocation", "/tmp/autoloader/_checkpoint/my_stream") + .load("/Volumes/main/raw/incoming") +) diff --git a/knowledge_base/pydabs_job_programmatic_generation/README.md b/knowledge_base/pydabs_job_programmatic_generation/README.md new file mode 100644 index 0000000..e124406 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/README.md @@ -0,0 +1,69 @@ +# pydabs_job_programmatic_generation + +This example demonstrates a simple Databricks job with programmatic generation and customization. + +* `src/`: Python source code for this project. + * `src/pydabs_job_programmatic_generation/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] pydabs_airflow_job` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_programmatic_generation/databricks.yml b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml new file mode 100644 index 0000000..263c202 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/databricks.yml @@ -0,0 +1,34 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_programmatic_generation + uuid: 3874a19c-7ea5-401d-bca2-9bd1f9d3efbf + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + mutators: + - 'mutators:add_email_notifications' + +include: + - resources/*.yml + - resources/*/*.yml + +resources: + sql_warehouses: + twoxs_warehouse: + name: Serverless Starter Warehouse + cluster_size: 2X-Small + auto_stop_mins: 10 + +targets: + dev: + mode: development + default: true + + prod: + mode: production + workspace: + root_path: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} diff --git a/knowledge_base/pydabs_job_programmatic_generation/mutators.py b/knowledge_base/pydabs_job_programmatic_generation/mutators.py new file mode 100644 index 0000000..0ef6172 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/mutators.py @@ -0,0 +1,18 @@ +from dataclasses import replace + +from databricks.bundles.core import Bundle, job_mutator +from databricks.bundles.jobs import Job, JobEmailNotifications + + +@job_mutator +def add_email_notifications(bundle: Bundle, job: Job) -> Job: + if bundle.target == "dev": + return job + + email_notifications = JobEmailNotifications.from_dict( + { + "on_failure": ["${workspace.current_user.userName}"], + } + ) + + return replace(job, email_notifications=email_notifications) diff --git a/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml b/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml new file mode 100644 index 0000000..6bb9e9f --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_programmatic_generation" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py new file mode 100644 index 0000000..c6b6516 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/resources/__init__.py @@ -0,0 +1,58 @@ +import os +import glob + +from databricks.bundles.core import ( + Bundle, + Resources, +) +from databricks.bundles.schemas import Schema + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + + the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() + + Here a job is created for every notebook in the src folder + Plus a schema for the dev environment, to have one schema per user deploying the Job + """ + resources = Resources() + + target_schema_name = "target_prod_schema" # this is the schema name for prod - should be deployed with Terraform + + if bundle.target == "dev": + # create 1 schema per user in other environments + # note databricks.yml: the target dev is mode "development" + schema = Schema( + catalog_name="main", + name="prog_gen_target", + comment="Schema for output data", + ) + resources.add_schema(resource_name="project_schema", schema=schema) + target_schema_name = "${resources.schemas.project_schema.name}" + + for file in glob.glob("src/*.sql", recursive=True): + resources.add_job( + resource_name=os.path.basename(file).removesuffix(".sql"), + job={ + "name": file, + "tasks": [ + { + "task_key": "create_table", + "sql_task": { + "parameters": {"target_schema": target_schema_name}, + "file": { + "path": file, + }, + "warehouse_id": "${resources.sql_warehouses.twoxs_warehouse.id}", + }, + } + ], + }, + ) + + return resources diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/__init__.py b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py new file mode 100644 index 0000000..cd9ac48 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/pydabs_job_programmatic_generation/main.py @@ -0,0 +1,6 @@ +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql new file mode 100644 index 0000000..7dfcf36 --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query1.sql @@ -0,0 +1,42 @@ +USE CATALOG samples; +USE SCHEMA tpcds_sf1; + +CREATE OR REPLACE TABLE IDENTIFIER( + 'main.' || :target_schema || '.tpcds_query1' +) AS + WITH customer_total_return AS ( + SELECT + sr_customer_sk AS ctr_customer_sk, + sr_store_sk AS ctr_store_sk, + SUM(sr_return_amt) AS ctr_total_return + FROM + store_returns, + date_dim + WHERE + sr_returned_date_sk = d_date_sk + AND d_year = 2001 + GROUP BY + sr_customer_sk, + sr_store_sk + ) + SELECT + c_customer_id + FROM + customer_total_return ctr1, + store, + customer + WHERE + ctr1.ctr_total_return > ( + SELECT + AVG(ctr_total_return) * 1.2 + FROM + customer_total_return ctr2 + WHERE + ctr1.ctr_store_sk = ctr2.ctr_store_sk + ) + AND s_store_sk = ctr1.ctr_store_sk + AND s_state = 'TN' + AND ctr1.ctr_customer_sk = c_customer_sk + ORDER BY + c_customer_id + LIMIT 100; \ No newline at end of file diff --git a/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql new file mode 100644 index 0000000..b7b10ac --- /dev/null +++ b/knowledge_base/pydabs_job_programmatic_generation/src/tpcds_query2.sql @@ -0,0 +1,72 @@ +USE CATALOG samples; +USE SCHEMA tpcds_sf1; + +CREATE OR REPLACE TABLE IDENTIFIER( + 'main.' || :target_schema || '.tpcds_query2' +) +TBLPROPERTIES ( + 'delta.columnMapping.mode' = 'name' +) AS +WITH wscs AS ( + SELECT sold_date_sk, sales_price + FROM ( + SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price + FROM web_sales + UNION ALL + SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price + FROM catalog_sales + ) +), +wswscs AS ( + SELECT + d_week_seq, + SUM(CASE WHEN d_day_name = 'Sunday' THEN sales_price ELSE NULL END) AS sun_sales, + SUM(CASE WHEN d_day_name = 'Monday' THEN sales_price ELSE NULL END) AS mon_sales, + SUM(CASE WHEN d_day_name = 'Tuesday' THEN sales_price ELSE NULL END) AS tue_sales, + SUM(CASE WHEN d_day_name = 'Wednesday' THEN sales_price ELSE NULL END) AS wed_sales, + SUM(CASE WHEN d_day_name = 'Thursday' THEN sales_price ELSE NULL END) AS thu_sales, + SUM(CASE WHEN d_day_name = 'Friday' THEN sales_price ELSE NULL END) AS fri_sales, + SUM(CASE WHEN d_day_name = 'Saturday' THEN sales_price ELSE NULL END) AS sat_sales + FROM wscs + JOIN date_dim ON d_date_sk = sold_date_sk + GROUP BY d_week_seq +) +SELECT + d_week_seq1, + ROUND(sun_sales1 / sun_sales2, 2), + ROUND(mon_sales1 / mon_sales2, 2), + ROUND(tue_sales1 / tue_sales2, 2), + ROUND(wed_sales1 / wed_sales2, 2), + ROUND(thu_sales1 / thu_sales2, 2), + ROUND(fri_sales1 / fri_sales2, 2), + ROUND(sat_sales1 / sat_sales2, 2) +FROM ( + SELECT + wswscs.d_week_seq AS d_week_seq1, + sun_sales AS sun_sales1, + mon_sales AS mon_sales1, + tue_sales AS tue_sales1, + wed_sales AS wed_sales1, + thu_sales AS thu_sales1, + fri_sales AS fri_sales1, + sat_sales AS sat_sales1 + FROM wswscs + JOIN date_dim ON date_dim.d_week_seq = wswscs.d_week_seq + WHERE d_year = 1998 +) y +JOIN ( + SELECT + wswscs.d_week_seq AS d_week_seq2, + sun_sales AS sun_sales2, + mon_sales AS mon_sales2, + tue_sales AS tue_sales2, + wed_sales AS wed_sales2, + thu_sales AS thu_sales2, + fri_sales AS fri_sales2, + sat_sales AS sat_sales2 + FROM wswscs + JOIN date_dim ON date_dim.d_week_seq = wswscs.d_week_seq + WHERE d_year = 1999 +) z +ON d_week_seq1 = d_week_seq2 - 53 +ORDER BY d_week_seq1; \ No newline at end of file diff --git a/knowledge_base/pydabs_job_table_update_trigger/README.md b/knowledge_base/pydabs_job_table_update_trigger/README.md new file mode 100644 index 0000000..d0061a6 --- /dev/null +++ b/knowledge_base/pydabs_job_table_update_trigger/README.md @@ -0,0 +1,68 @@ +# pydabs_job_table_update_trigger + +This example demonstrates a workflow when producers write to Unity Catalog tables, consumers can trigger on table updates instead of time‑based schedules. + + +The Lakeflow Job is configured with: +- **Table update trigger**: To run a job when new data is ready without the need for a continuously running cluster or knowledge of the processes that update a table. +- **Configurable wait times**: + - Minimum time between triggers: 0 seconds + - Wait after last file change: 3600 seconds +- **Automatic processing**: When updates are detected, the job automatically runs and processes them + +* `src/`: Notebook source code for this project. + * `src/assets/consume_table.py`: +* `resources/`: Resource configurations (jobs, pipelines, etc.) + * `resources/table_update.py`: PyDABs job with table update trigger configuration. + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, this project will deploy a job called + `[dev yourname] table_update_example` to your workspace. + You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**. + +3. Development vs. Production behavior + - Dev target (mode: development): Schedules and automatic triggers are disabled by design, so the job will not auto-fire on file arrival. Use manual runs to test the logic. + You can also manually run it with: + + ``` + $ databricks bundle run table_update_example + ``` + - Prod target (mode: production): Automatic triggers are active. Uploading a file to the configured Unity Catalog Volume path will trigger the job run when the trigger evaluates. + \ No newline at end of file diff --git a/knowledge_base/pydabs_job_table_update_trigger/databricks.yml b/knowledge_base/pydabs_job_table_update_trigger/databricks.yml new file mode 100644 index 0000000..6e24521 --- /dev/null +++ b/knowledge_base/pydabs_job_table_update_trigger/databricks.yml @@ -0,0 +1,21 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_table_update_trigger + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true + workspace: + host: https://myworkspace.databricks.com \ No newline at end of file diff --git a/knowledge_base/pydabs_job_table_update_trigger/pyproject.toml b/knowledge_base/pydabs_job_table_update_trigger/pyproject.toml new file mode 100644 index 0000000..898124d --- /dev/null +++ b/knowledge_base/pydabs_job_table_update_trigger/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_table_update_trigger" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_table_update_trigger/resources/__init__.py b/knowledge_base/pydabs_job_table_update_trigger/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_table_update_trigger/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_table_update_trigger/resources/table_update.py b/knowledge_base/pydabs_job_table_update_trigger/resources/table_update.py new file mode 100644 index 0000000..d04f77f --- /dev/null +++ b/knowledge_base/pydabs_job_table_update_trigger/resources/table_update.py @@ -0,0 +1,24 @@ +from databricks.bundles.jobs import ( + Job, + NotebookTask, + Task, + TriggerSettings, + TableUpdateTriggerConfiguration, +) + +consume_table = Task( + task_key="consume_table", + notebook_task=NotebookTask(notebook_path="src/assets/consume_table.py"), +) + +job = Job( + name="table_update_example", + trigger=TriggerSettings( + table_update=TableUpdateTriggerConfiguration( + table_names=["main.analytics.daily_events"], + min_time_between_triggers_seconds=0, + wait_after_last_change_seconds=3600, + ) + ), + tasks=[consume_table], +) diff --git a/knowledge_base/pydabs_job_table_update_trigger/src/assets/consume_table.py b/knowledge_base/pydabs_job_table_update_trigger/src/assets/consume_table.py new file mode 100644 index 0000000..69db2be --- /dev/null +++ b/knowledge_base/pydabs_job_table_update_trigger/src/assets/consume_table.py @@ -0,0 +1,4 @@ +# Databricks notebook source +source_table = "main.analytics.daily_events" +# Insert consumer logic here +df = spark.read.table(source_table) diff --git a/knowledge_base/pydabs_job_with_for_each/README.md b/knowledge_base/pydabs_job_with_for_each/README.md new file mode 100644 index 0000000..ba958df --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/README.md @@ -0,0 +1,70 @@ +# pydabs_job_with_for_each + +This example demonstrates a simple Databricks job that uses a foreach task. + +* `src/`: Python source code for this project. + * `foreach/generate_items.py`: A notebook which returns a list of items to be used for task generation. + * `foreach/process_item.py`: A notebook which will process an item. +* `resources/`: Resource configurations (jobs, pipelines, etc.) + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] for_each_task_example` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_with_for_each/databricks.yml b/knowledge_base/pydabs_job_with_for_each/databricks.yml new file mode 100644 index 0000000..e5d5f88 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/databricks.yml @@ -0,0 +1,19 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_with_foreach + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_for_each/pyproject.toml b/knowledge_base/pydabs_job_with_for_each/pyproject.toml new file mode 100644 index 0000000..b6439ce --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_with_for_each" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_with_for_each/resources/__init__.py b/knowledge_base/pydabs_job_with_for_each/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py new file mode 100644 index 0000000..804e085 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/resources/for_each_simple.py @@ -0,0 +1,27 @@ +from databricks.bundles.jobs import Job, Task, NotebookTask, ForEachTask, TaskDependency + +generate_items = Task( + task_key="generate_items", + notebook_task=NotebookTask(notebook_path="src/foreach/generate_items.py"), +) + +process_item = Task( + task_key="process_item", + for_each_task=ForEachTask( + inputs="{{tasks.generate_items.values.items}}", + task=Task( + task_key="process_item_iteration", + notebook_task=NotebookTask( + notebook_path="src/foreach/process_item.py", + base_parameters={"item": "{{input}}"}, + ), + ), + concurrency=10, + ), + depends_on=[TaskDependency(task_key="generate_items")], +) + +job = Job( + name="for_each_task_example", + tasks=[generate_items, process_item], +) diff --git a/knowledge_base/pydabs_job_with_for_each/src/foreach/generate_items.py b/knowledge_base/pydabs_job_with_for_each/src/foreach/generate_items.py new file mode 100644 index 0000000..3244348 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/foreach/generate_items.py @@ -0,0 +1,5 @@ +# Databricks notebook source +from databricks.sdk.runtime import dbutils + +items = [1, 2, 3] +dbutils.jobs.taskValues.set(key="items", value=items) diff --git a/knowledge_base/pydabs_job_with_for_each/src/foreach/process_item.py b/knowledge_base/pydabs_job_with_for_each/src/foreach/process_item.py new file mode 100644 index 0000000..1481a89 --- /dev/null +++ b/knowledge_base/pydabs_job_with_for_each/src/foreach/process_item.py @@ -0,0 +1,8 @@ +# Databricks notebook source + +# Runs once per item in the for-each. Do not call dbutils.jobs.taskValues.set() here.\n", +from databricks.sdk.runtime import dbutils + +# Current iteration value passed from the for-each task (base_parameters: item = {{input}})\n", +current_item = dbutils.widgets.get("item") +print(f"Processing item: {current_item}") diff --git a/knowledge_base/pydabs_job_with_task_values/README.md b/knowledge_base/pydabs_job_with_task_values/README.md new file mode 100644 index 0000000..30097bf --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/README.md @@ -0,0 +1,69 @@ +# pydabs_job_with_task_values + +This example demonstrates a simple Databricks job that uses tasks values to exchange info. + +* `src/`: Python source code for this project. + * `src/pydabs_job_with_task_values/`: Shared Python code that can be used by jobs and pipelines. +* `resources/`: Resource configurations, in this case a Lakeflow Job with two tasks exchanging information. + + +## Getting started + +Choose how you want to work on this project: + +(a) Directly in your Databricks workspace, see + https://docs.databricks.com/dev-tools/bundles/workspace. + +(b) Locally with an IDE like Cursor or VS Code, see + https://docs.databricks.com/vscode-ext. + +(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html + +If you're developing with an IDE, dependencies for this project should be installed using uv: + +* Make sure you have the UV package manager installed. + It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/. +* Run `uv sync --dev` to install the project's dependencies. + + +# Using this project using the CLI + +The Databricks workspace and IDE extensions provide a graphical interface for working +with this project. It's also possible to interact with it directly using the CLI: + +1. Authenticate to your Databricks workspace, if you have not done so already: + ``` + $ databricks configure + ``` + +2. To deploy a development copy of this project, type: + ``` + $ databricks bundle deploy --target dev + ``` + (Note that "dev" is the default target, so the `--target` parameter + is optional here.) + + This deploys everything that's defined for this project. + For example, the default template would deploy a job called + `[dev yourname] xcom_to_task_values_example` to your workspace. + You can find that resource by opening your workpace and clicking on **Jobs & Pipelines**. + +3. Similarly, to deploy a production copy, type: + ``` + $ databricks bundle deploy --target prod + ``` + Note that the default job from the template has a schedule that runs every day + (defined in resources/sample_job.job.yml). The schedule + is paused when deploying in development mode (see + https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). + +4. To run a job or pipeline, use the "run" command: + ``` + $ databricks bundle run + ``` + +5. Finally, to run tests locally, use `pytest`: + ``` + $ uv run pytest + ``` + diff --git a/knowledge_base/pydabs_job_with_task_values/databricks.yml b/knowledge_base/pydabs_job_with_task_values/databricks.yml new file mode 100644 index 0000000..fe2f64b --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/databricks.yml @@ -0,0 +1,19 @@ +# This is a Databricks asset bundle definition for pydabs_airflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: pydabs_job_with_task_values + +python: + venv_path: .venv + # Functions called to load resources defined in Python. See resources/__init__.py + resources: + - "resources:load_resources" + +include: + - resources/*.yml + - resources/*/*.yml + +targets: + dev: + mode: development + default: true \ No newline at end of file diff --git a/knowledge_base/pydabs_job_with_task_values/pyproject.toml b/knowledge_base/pydabs_job_with_task_values/pyproject.toml new file mode 100644 index 0000000..c280862 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "pydabs_job_with_task_values" +version = "0.0.1" +authors = [{ name = "Databricks Field Engineering" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-connect>=15.4,<15.5", + "databricks-bundles==0.275.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/knowledge_base/pydabs_job_with_task_values/resources/__init__.py b/knowledge_base/pydabs_job_with_task_values/resources/__init__.py new file mode 100644 index 0000000..fbcb9dc --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/resources/__init__.py @@ -0,0 +1,16 @@ +from databricks.bundles.core import ( + Bundle, + Resources, + load_resources_from_current_package_module, +) + + +def load_resources(bundle: Bundle) -> Resources: + """ + 'load_resources' function is referenced in databricks.yml and is responsible for loading + bundle resources defined in Python code. This function is called by Databricks CLI during + bundle deployment. After deployment, this function is not used. + """ + + # the default implementation loads all Python files in 'resources' directory + return load_resources_from_current_package_module() diff --git a/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py new file mode 100644 index 0000000..5ded8a0 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/resources/task_values_simple.py @@ -0,0 +1,17 @@ +from databricks.bundles.jobs import Job, Task, NotebookTask, TaskDependency + +producer = Task( + task_key="producer", + notebook_task=NotebookTask(notebook_path="src/xcom/producer.py"), +) + +consumer = Task( + task_key="consumer", + depends_on=[TaskDependency(task_key="producer")], + notebook_task=NotebookTask(notebook_path="src/xcom/consumer.py"), +) + +job = Job( + name="xcom_to_task_values_example", + tasks=[producer, consumer], +) diff --git a/knowledge_base/pydabs_job_with_task_values/src/xcom/__init__.py b/knowledge_base/pydabs_job_with_task_values/src/xcom/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/knowledge_base/pydabs_job_with_task_values/src/xcom/consumer.py b/knowledge_base/pydabs_job_with_task_values/src/xcom/consumer.py new file mode 100644 index 0000000..96442b2 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/xcom/consumer.py @@ -0,0 +1,5 @@ +# Databricks notebook source +from databricks.sdk.runtime import dbutils + +val = dbutils.jobs.taskValues.get(taskKey="producer", key="answer", debugValue=None) +print(f"Got value: {val}") diff --git a/knowledge_base/pydabs_job_with_task_values/src/xcom/producer.py b/knowledge_base/pydabs_job_with_task_values/src/xcom/producer.py new file mode 100644 index 0000000..903d077 --- /dev/null +++ b/knowledge_base/pydabs_job_with_task_values/src/xcom/producer.py @@ -0,0 +1,6 @@ +# Databricks notebook source +from databricks.sdk.runtime import dbutils + +value = 42 +dbutils.jobs.taskValues.set(key="answer", value=value) +print(f"Produced value: {value}")