diff --git a/.github/workflows/onpush.yml b/.github/workflows/onpush.yml
index 85ca194..98cf18d 100644
--- a/.github/workflows/onpush.yml
+++ b/.github/workflows/onpush.yml
@@ -47,7 +47,16 @@ jobs:
- name: Deploy on staging
run: |
- make deploy-serverless env=staging
+ BRANCH_NAME="${{ github.head_ref || github.ref_name }}"
+ PR_NUMBER="${{ github.event.pull_request.number }}"
+ DEVELOPER="${{ github.actor }}"
+
+ uv run python ./scripts/generate_template_workflow.py staging --serverless \
+ --branch "$BRANCH_NAME" \
+ --developer "$DEVELOPER" \
+ $(if [ -n "$PR_NUMBER" ]; then echo "--pr-number $PR_NUMBER"; fi)
+
+ uv run databricks bundle deploy --target staging
- name: Run on staging (integration tests)
run: |
@@ -55,4 +64,13 @@ jobs:
- name: Deploy on prod
run: |
- make deploy-serverless env=prod
+ BRANCH_NAME="${{ github.head_ref || github.ref_name }}"
+ PR_NUMBER="${{ github.event.pull_request.number }}"
+ DEVELOPER="${{ github.actor }}"
+
+ uv run python ./scripts/generate_template_workflow.py prod --serverless \
+ --branch "$BRANCH_NAME" \
+ --developer "$DEVELOPER" \
+ $(if [ -n "$PR_NUMBER" ]; then echo "--pr-number $PR_NUMBER"; fi)
+
+ uv run databricks bundle deploy --target prod
diff --git a/.gitignore b/.gitignore
index ffe59e1..fb2a185 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,17 +2,14 @@ notes/
.databricks/
.vscode/
.venv/
-*.pyc
-*.lock
-__pycache__/
+.claude/
.pytest_cache/
dist/
build/
-output/
coverage_reports/
-.claude/
src/template.egg-info/
+CLAUDE.md
+*.pyc
+*.lock
+.coverage*
resources/workflow.yml
-.coverage
-.coverage.*
-CLAUDE.md
\ No newline at end of file
diff --git a/Makefile b/Makefile
index 4312ada..490a572 100644
--- a/Makefile
+++ b/Makefile
@@ -10,10 +10,6 @@ pre-commit:
pre-commit autoupdate
pre-commit run --all-files
-deploy:
- uv run python ./scripts/generate_template_workflow.py $(env)
- uv run databricks bundle deploy --target $(env)
-
deploy-serverless:
uv run python ./scripts/generate_template_workflow.py $(env) --serverless
uv run databricks bundle deploy --target $(env)
diff --git a/README.md b/README.md
index 5907811..4a77e42 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,9 @@
# databricks-template
-> A production-ready PySpark project template with medallion architecture, Python packaging, unit tests, integration tests, CI/CD automation, Databricks Asset Bundles, and DQX data quality framework.
+> A production-ready PySpark project template with medallion architecture, Python packaging, unit tests, integration tests, coverage tests, CI/CD automation, Databricks Asset Bundles, and DQX data quality framework.

-
+


@@ -18,12 +18,14 @@ Interested in bringing these principles in your own project? Letβs [connect o
## π§ͺ Technologies
- Databricks Free Edition (Serverless)
-- Databricks Runtime 17.3 LTS
-- PySpark 4.0
-- Python 3.12+
-- Unity Catalog
+- Databricks Runtime 18.0 LTS
- Databricks Asset Bundles
- Databricks DQX
+- Databricks CLI
+- Databricks Python SDK
+- PySpark 4.1
+- Python 3.12+
+- Unity Catalog
- GitHub Actions
- Pytest
@@ -32,8 +34,9 @@ Interested in bringing these principles in your own project? Letβs [connect o
This project template demonstrates how to:
- structure PySpark code inside classes/packages.
-- structure unit tests for the data transformations and set up VSCode to run them on your local machine.
+- run unit tests on transformations with [pytest package](https://pypi.org/project/pytest/) - set up VSCode to run unit tests on your local machine.
- structure integration tests to be executed on different environments / catalogs.
+- utilize [coverage package](https://pypi.org/project/coverage/) to generate test coverage reports.
- package and deploy code to different environments (dev, staging, prod) using a CI/CD pipeline with [Github Actions](https://docs.github.com/en/actions).
- isolate "dev" environments / catalogs to avoid concurrency issues between developers testing jobs.
- utilize [uv](https://docs.astral.sh/uv/) as a project/package manager.
@@ -42,25 +45,19 @@ This project template demonstrates how to:
- use [medallion architecture](https://www.databricks.com/glossary/medallion-architecture) pattern.
- lint and format code with [ruff](https://docs.astral.sh/ruff/) and [pre-commit](https://pre-commit.com/).
- use a Make file to automate repetitive tasks.
-- utilize [pytest package](https://pypi.org/project/pytest/) to run unit tests on transformations and generate test coverage reports.
- utilize [argparse package](https://pypi.org/project/argparse/) to build a flexible command line interface to start the jobs.
-- utilize [funcy package](https://pypi.org/project/funcy/) to log the execution time of each transformation.
- utilize [Databricks Asset Bundles](https://docs.databricks.com/en/dev-tools/bundles/index.html) to package/deploy/run a Python wheel package on Databricks.
-- utilize [Databricks DQX](https://databrickslabs.github.io/dqx/) to define and enforce data quality rules, such as null checks, uniqueness, thresholds, and schema validation.
-- utilize [Databricks SDK for Python](https://docs.databricks.com/en/dev-tools/sdk-python.html) to manage workspaces and accounts. The sample script enables metastore system tables with [relevant data about billing, usage, lineage, prices, and access](https://www.youtube.com/watch?v=LcRWHzk8Wm4).
+- utilize [Databricks DQX](https://databrickslabs.github.io/dqx/) to define and enforce data quality rules, such as null checks, uniqueness, thresholds, and schema validation, and filter bad data on quarantine tables.
+- utilize [Databricks SDK for Python](https://docs.databricks.com/en/dev-tools/sdk-python.html) to manage workspaces and accounts and analyse costs. Refer to 'scripts' folder for some examples.
- utilize [Databricks Unity Catalog](https://www.databricks.com/product/unity-catalog) and get data lineage for your tables and columns and a simplified permission model for your data.
- utilize [Databricks Lakeflow Jobs](https://docs.databricks.com/en/workflows/index.html) to execute a DAG and [task parameters](https://docs.databricks.com/en/workflows/jobs/parameter-value-references.html) to share context information between tasks (see [Task Parameters section](#task-parameters)). Yes, you don't need Airflow to manage your DAGs here!!!
-- **utilize serverless clusters on Databricks Free Edition to deploy your pipelines.**
-- utilize [Databricks job clusters](https://docs.databricks.com/en/workflows/jobs/use-compute.html#use-databricks-compute-with-your-jobs) to reduce costs.
-- define Databricks clusters on AWS and Azure.
+- utilize serverless job clusters on [Databricks Free Edition](https://docs.databricks.com/aws/en/getting-started/free-edition ) to deploy your pipelines.
## π§ Resources
-- [Goodbye Pip and Poetry. Why UV Might Be All You Need](https://codecut.ai/why-uv-might-all-you-need/)
-
For a debate on the use of notebooks vs. Python packaging, please refer to:
- [The Rise of The Notebook Engineer](https://dataengineeringcentral.substack.com/p/the-rise-of-the-notebook-engineer)
- [Please donβt make me use Databricks notebooks](https://medium.com/@seade03/please-dont-make-me-use-databricks-notebooks-3d07a4a332ae)
@@ -73,7 +70,64 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle
- [Deploying Databricks Asset Bundles (DABs) at Scale](https://www.youtube.com/watch?v=mMwprgB-sIU)
- [A Prescription for Success: Leveraging DABs for Faster Deployment and Better Patient Outcomes](https://www.youtube.com/watch?v=01JHTM2UP-U)
-## Jobs (former Workflows)
+Other:
+- [Goodbye Pip and Poetry. Why UV Might Be All You Need](https://codecut.ai/why-uv-might-all-you-need/)
+
+## π Folder Structure
+
+```
+databricks-template/
+β
+βββ .github/ # CI/CD automation
+β βββ workflows/
+β βββ onpush.yml # GitHub Actions pipeline
+β
+βββ src/ # Main source code
+β βββ template/ # Python package
+β βββ main.py # Entry point with CLI (argparse)
+β βββ config.py # Configuration management
+β βββ baseTask.py # Base class for all tasks
+β βββ commonSchemas.py # Shared PySpark schemas
+β βββ job1/ # Job-specific tasks
+β βββ extract_source1.py
+β βββ extract_source2.py
+β βββ generate_orders.py
+β βββ generate_orders_agg.py
+β βββ integration_setup.py
+β βββ integration_validate.py
+β
+βββ tests/ # Unit tests
+β βββ job1/
+β βββ unit_test.py # Pytest unit tests
+β
+βββ resources/ # Databricks workflow templates
+β βββ wf_template_serverless.yml # Jinja2 template for serverless
+β βββ wf_template.yml # Jinja2 template for job clusters
+β βββ workflow.yml # Generated workflow (auto-created)
+β
+βββ scripts/ # Helper scripts
+β βββ generate_template_workflow.py # Workflow generator (Jinja2)
+β βββ sdk_analyze_job_costs.py # Cost analysis script
+β βββ sdk_workspace_and_account.py # Workspace and account management
+β print("SUMMARY")
+βββ docs/ # Documentation assets
+β βββ dag.png
+β βββ task_output.png
+β βββ data_lineage.png
+β βββ data_quality.png
+β βββ ci_cd.png
+β
+βββ dist/ # Build artifacts (Python wheel)
+βββ coverage_reports/ # Test coverage reports
+β
+βββ databricks.yml # Databricks Asset Bundle config
+βββ pyproject.toml # Python project configuration (uv)
+βββ Makefile # Build automation
+βββ .pre-commit-config.yaml # Pre-commit hooks (ruff)
+βββ README.md # This file
+```
+
+## Jobs
@@ -89,10 +143,11 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle
-## Data Lineage (Catalog Explorer)
+## Data Lineage
+
@@ -117,47 +172,27 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle
## Instructions
-### 1) Create a Databricks Workspace
+1) Create a workspace. Use a [Databricks Free Edition](https://docs.databricks.com/aws/en/getting-started/free-edition) workspace.
-option 1) utilize a [Databricks Free Edition](https://docs.databricks.com/aws/en/getting-started/free-edition) workspace.
-option 2) create a Premium workspace. Follow instructions [here](https://github.com/databricks/terraform-databricks-examples)
+2) Install and configure Databricks CLI on your local machine. Follow instructions [here](https://docs.databricks.com/en/dev-tools/cli/install.html). Check the current version on databricks.yaml.
-### 2) Install and configure Databricks CLI on your local machine
-
-Follow the instructions [here](https://docs.databricks.com/en/dev-tools/cli/install.html)
-
-
-### 3) Build Python env and execute unit tests on your local machine
+3) Build Python env and execute unit tests on your local machine
make sync & make test
-You can also execute unit tests from your preferred IDE. Here's a screenshot from [VS Code](https://code.visualstudio.com/) with [Microsoft's Python extension](https://marketplace.visualstudio.com/items?itemName=ms-python.python) installed.
-
-
-### 4) Deploy and execute on the dev workspace.
-
-option 1) for Databricks Free Edition use:
+4) Deploy and execute on the dev workspace.
make deploy-serverless env=dev
- make deploy-serverless env=staging
- make deploy-serverless env=prod
-
-
-option 2) for Premium workspace:
-
- Update "job_clusters" properties on wf_template.yml file. There are different properties for AWS and Azure.
- make deploy env=dev
- make deploy env=staging
- make deploy env=prod
+5) configure CI/CD automation. Configure [Github Actions repository secrets](https://docs.github.com/en/actions/security-guides/using-secrets-in-github-actions) (DATABRICKS_HOST and DATABRICKS_TOKEN).
-### 5) configure CI/CD automation
+6) You can also execute unit tests from your preferred IDE. Here's a screenshot from [VS Code](https://code.visualstudio.com/) with [Microsoft's Python extension](https://marketplace.visualstudio.com/items?itemName=ms-python.python) installed.
-Configure [Github Actions repository secrets](https://docs.github.com/en/actions/security-guides/using-secrets-in-github-actions) DATABRICKS_HOST and DATABRICKS_TOKEN.
+-
## Task parameters
diff --git a/databricks.yml b/databricks.yml
index 6bc5d95..e3ced86 100644
--- a/databricks.yml
+++ b/databricks.yml
@@ -2,6 +2,7 @@
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: default_python
+ databricks_cli_version: ">=0.286.0"
artifacts:
default:
diff --git a/docs/ci_cd.drawio b/docs/ci_cd.drawio
index bb83077..6fe2533 100755
--- a/docs/ci_cd.drawio
+++ b/docs/ci_cd.drawio
@@ -1,17 +1,17 @@
-
+
-
+
-
+
-
+
@@ -21,30 +21,30 @@
-
-
+
+
-
-
+
+
-
-
+
+
-
+
-
-
+
+
-
+
@@ -55,17 +55,11 @@
-
-
+
+
-
-
-
-
-
-
-
-
+
+
diff --git a/docs/ci_cd.png b/docs/ci_cd.png
index 8251497..7d63661 100644
Binary files a/docs/ci_cd.png and b/docs/ci_cd.png differ
diff --git a/pyproject.toml b/pyproject.toml
index e2bfde1..0162d41 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,23 +6,23 @@ authors = [
{name = "user@company.com"}
]
readme = "README.md"
-requires-python = ">=3.12"
+requires-python = "==3.12.*"
dependencies = [
"funcy==2.0",
- "databricks-labs-dqx==0.9.3",
+ "databricks-labs-dqx==0.12.0",
]
[project.optional-dependencies]
dev = [
"numpy==2.1.3",
"pandas==2.2.3",
- "pyarrow==19.0.1",
+ "pyarrow==21.0.0",
"pydantic==2.10.6",
"coverage==7.6.1",
"pre-commit==4.0.1",
"pytest==8.3.5",
"pytest-cov==5.0.0",
- "pyspark==4.0.0",
+ "pyspark==4.1.0",
"jinja2==3.1.5",
]
diff --git a/resources/wf_template.yml b/resources/wf_template.yml
deleted file mode 100644
index 8f02ff1..0000000
--- a/resources/wf_template.yml
+++ /dev/null
@@ -1,95 +0,0 @@
-# The main job for default_python
-resources:
- jobs:
-
- job1:
- name: template_${bundle.target}
- timeout_seconds: 3600
-
- {% if environment == 'prod'%}
- schedule:
- quartz_cron_expression: "0 0 5 * * ?"
- timezone_id: "UTC"
-
- no_alert_for_skipped_runs: false
- {% endif %}
-
- tasks:
-
- - task_key: extract_source1
- job_cluster_key: cluster-dev-aws
- max_retries: 0
- python_wheel_task:
- package_name: template
- entry_point: main
- parameters: ["--task={{'{{task.name}}'}}",
- "--env=${bundle.target}",
- "${var.debug}"]
- libraries:
- - whl: ../dist/*.whl
-
- - task_key: extract_source2
- job_cluster_key: cluster-dev-aws
- max_retries: 0
- python_wheel_task:
- package_name: template
- entry_point: main
- parameters: ["--task={{'{{task.name}}'}}",
- "--env=${bundle.target}",
- "${var.debug}"]
- libraries:
- - whl: ../dist/*.whl
-
- - task_key: generate_orders
- depends_on:
- - task_key: extract_source1
- - task_key: extract_source2
- job_cluster_key: cluster-dev-aws
- max_retries: 0
- python_wheel_task:
- package_name: template
- entry_point: main
- parameters: ["--task={{'{{task.name}}'}}",
- "--env=${bundle.target}",
- "${var.debug}"]
- libraries:
- - whl: ../dist/*.whl
-
- - task_key: generate_orders_agg
- depends_on:
- - task_key: generate_orders
- job_cluster_key: cluster-dev-aws
- max_retries: 0
- python_wheel_task:
- package_name: template
- entry_point: main
- parameters: ["--task={{'{{task.name}}'}}",
- "--env=${bundle.target}",
- "${var.debug}"]
- libraries:
- - whl: ../dist/*.whl
-
- job_clusters:
- # - job_cluster_key: cluster-dev-azure
- # new_cluster:
- # spark_version: 15.3.x-scala2.12
- # node_type_id: Standard_D8as_v5
- # num_workers: 1
- # azure_attributes:
- # first_on_demand: 1
- # availability: SPOT_AZURE
- # data_security_mode: SINGLE_USER
-
- - job_cluster_key: cluster-dev-aws
- new_cluster:
- spark_version: 14.2.x-scala2.12
- node_type_id: c5d.xlarge
- num_workers: 1
- aws_attributes:
- first_on_demand: 1
- availability: SPOT_WITH_FALLBACK
- zone_id: auto
- spot_bid_price_percent: 100
- ebs_volume_count: 0
- policy_id: 001934F3ABD02D4A
- data_security_mode: SINGLE_USER
diff --git a/resources/wf_template_serverless.yml b/resources/wf_template_serverless.yml
index d30c6d3..372a7a5 100644
--- a/resources/wf_template_serverless.yml
+++ b/resources/wf_template_serverless.yml
@@ -6,6 +6,12 @@ resources:
name: template_${bundle.target}
timeout_seconds: 3600
+ # Git metadata tags for traceability
+ tags:
+ git_branch: "{{ branch }}"
+ deployed_by: "{{ developer }}"
+ {% if pr_number %}pr_number: "{{ pr_number }}"{% endif %}
+
# A list of task execution environment specifications that can be referenced by tasks of this job.
deployment:
kind: BUNDLE
@@ -85,6 +91,12 @@ resources:
name: template_${bundle.target}_integration_test
timeout_seconds: 3600
+ # Git metadata tags for traceability
+ tags:
+ git_branch: "{{ branch }}"
+ deployed_by: "{{ developer }}"
+ {% if pr_number %}pr_number: "{{ pr_number }}"{% endif %}
+
environments:
- environment_key: default
spec:
diff --git a/scripts/generate_template_workflow.py b/scripts/generate_template_workflow.py
index 1f23555..6623dd8 100644
--- a/scripts/generate_template_workflow.py
+++ b/scripts/generate_template_workflow.py
@@ -1,32 +1,82 @@
import sys
-
+import subprocess
+import argparse
from jinja2 import Environment, FileSystemLoader
-if len(sys.argv) not in [2, 3]:
- print("Usage: python generate_workflow.py [serverless]")
- print("Example: python generate_workflow.py prod")
- print("Example: python generate_workflow.py prod serverless")
- sys.exit(1)
-serverless = len(sys.argv) == 3 and sys.argv[2].lower() == "--serverless"
-print(sys.argv[2].lower())
-print(f"Serverless mode: {serverless}")
+def get_git_branch():
+ """Get current git branch name, fallback to 'unknown' if not available."""
+ try:
+ branch = (
+ subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL)
+ .decode("utf-8")
+ .strip()
+ )
+ return branch
+ except Exception:
+ return "unknown"
+
+
+def get_git_user():
+ """Get git user name, fallback to 'unknown' if not available."""
+ try:
+ user = (
+ subprocess.check_output(["git", "config", "user.name"], stderr=subprocess.DEVNULL).decode("utf-8").strip()
+ )
+ return user
+ except Exception:
+ return "unknown"
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Generate Databricks workflow YAML from Jinja2 template",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ python generate_template_workflow.py dev --serverless
+ python generate_template_workflow.py staging --serverless --branch main --developer john --pr-number 123
+ """,
+ )
+
+ parser.add_argument("environment", help="Target environment (dev, staging, prod)")
+ parser.add_argument("--serverless", action="store_true", help="Use serverless workflow template")
+ parser.add_argument("--branch", help="Git branch name (auto-detected if not provided)")
+ parser.add_argument("--developer", help="Developer/deployer name (auto-detected if not provided)")
+ parser.add_argument("--pr-number", help="Pull request number (optional)")
+
+ args = parser.parse_args()
+
+ # Get or auto-detect git metadata
+ branch = args.branch if args.branch else get_git_branch()
+ developer = args.developer if args.developer else get_git_user()
+ pr_number = args.pr_number if args.pr_number else ""
+
+ print(f"Environment: {args.environment}")
+ print(f"Serverless mode: {args.serverless}")
+ print(f"Git branch: {branch}")
+ print(f"Developer: {developer}")
+ print(f"PR number: {pr_number if pr_number else 'N/A'}")
+
+ # Load and render template
+ file_loader = FileSystemLoader(".")
+ env = Environment(loader=file_loader)
+
+ if args.serverless:
+ template = env.get_template("/resources/wf_template_serverless.yml")
+ else:
+ template = env.get_template("/resources/wf_template.yml")
-environment = sys.argv[1]
+ # Render the template with all variables
+ output = template.render(environment=args.environment, branch=branch, developer=developer, pr_number=pr_number)
-file_loader = FileSystemLoader(".")
-env = Environment(loader=file_loader)
-if serverless:
- template = env.get_template("/resources/wf_template_serverless.yml")
-else:
- template = env.get_template("/resources/wf_template.yml")
+ # Save the rendered YAML to a file
+ output_file = "./resources/workflow.yml"
+ with open(output_file, "w") as f:
+ f.write(output)
-# Render the template with the environment variable
-output = template.render(environment=environment)
+ print(f"\nGenerated {output_file}")
-# Save the rendered YAML to a file
-output_file = "./resources/workflow.yml"
-with open(output_file, "w") as f:
- f.write(output)
-print(f"Generated {output_file}")
+if __name__ == "__main__":
+ main()
diff --git a/scripts/sdk_analyze_job_costs.py b/scripts/sdk_analyze_job_costs.py
new file mode 100644
index 0000000..e39b735
--- /dev/null
+++ b/scripts/sdk_analyze_job_costs.py
@@ -0,0 +1,231 @@
+"""
+Analyze costs of Databricks jobs run today.
+
+This script queries Databricks system tables to calculate costs for jobs
+executed today, providing insights into compute usage and billing.
+
+Usage:
+ python scripts/analyze_job_costs.py [--profile PROFILE] [--date DATE]
+
+Examples:
+ python scripts/analyze_job_costs.py
+ python scripts/analyze_job_costs.py --profile dev
+ python scripts/analyze_job_costs.py --date 2024-01-15
+"""
+
+import argparse
+from datetime import datetime, timedelta
+from databricks.sdk import WorkspaceClient
+from databricks.sdk.service.sql import StatementState
+import time
+
+
+def get_todays_date():
+ """Get today's date in YYYY-MM-DD format."""
+ return datetime.now().strftime("%Y-%m-%d")
+
+
+def analyze_job_costs(workspace: WorkspaceClient, target_date: str):
+ """
+ Analyze job costs for a specific date using system tables.
+
+ Args:
+ workspace: Databricks WorkspaceClient instance
+ target_date: Date to analyze in YYYY-MM-DD format
+ """
+ print(f"\n{'=' * 80}")
+ print(f"Job Cost Analysis for {target_date}")
+ print(f"{'=' * 80}\n")
+
+ # Query to get usage data from system tables
+ # Note: This assumes system tables are enabled for the metastore
+ query = f"""
+ SELECT
+ u.usage_metadata.job_id,
+ u.usage_metadata.job_name,
+ u.sku_name,
+ u.cloud,
+ SUM(usage_quantity) as total_dbu,
+ COUNT(DISTINCT u.usage_metadata.job_run_id) as num_runs,
+ ROUND(SUM(usage_quantity * list_prices.pricing.default), 2) as estimated_cost_usd
+ FROM
+ system.billing.usage u
+ LEFT JOIN
+ system.billing.list_prices ON u.sku_name = list_prices.sku_name
+ AND u.cloud = list_prices.cloud
+ WHERE
+ usage_date = '{target_date}'
+ AND
+ u.usage_metadata.job_id IS NOT NULL
+ GROUP BY
+ u.usage_metadata.job_id,
+ u.usage_metadata.job_name,
+ u.sku_name,
+ u.cloud
+ ORDER BY
+ estimated_cost_usd DESC
+ """
+
+ try:
+ print("Querying system.billing.usage table...\n")
+ result = workspace.statement_execution.execute_statement(
+ warehouse_id=get_warehouse_id(workspace),
+ statement=query,
+ wait_timeout="0s", # async
+ )
+
+ # Poll for result completion
+ while True:
+ result = workspace.statement_execution.get_statement(result.statement_id)
+ print("Waiting for query to complete...")
+ state = result.status.state
+ print(state)
+ if state in [StatementState.SUCCEEDED, StatementState.FAILED, StatementState.CANCELED]:
+ break
+ time.sleep(1)
+
+ # print(result.result)
+ # print("-----")
+ # print(result.result.data_array)
+
+ # Process and display results
+ if result.result and result.result.data_array:
+ display_job_costs(result.result.data_array)
+ display_summary(result.result.data_array)
+ else:
+ print(f"No job runs found for {target_date}")
+
+ except Exception as e:
+ print(f"Error querying system tables: {e}")
+ fallback_analysis(workspace, target_date)
+
+
+def get_warehouse_id(workspace: WorkspaceClient) -> str:
+ """Get the first available SQL warehouse ID."""
+ warehouses = list(workspace.warehouses.list())
+ if not warehouses:
+ raise ValueError("No SQL warehouse found. Please create one to query system tables.")
+ return warehouses[0].id
+
+
+def display_job_costs(data_array):
+ print(f"{'Job ID':<15} {'Job Name':<40} {'SKU':<40} {'Runs':>8} {'DBUs':>10} {'Cost (USD)':>12}")
+ print("-" * 110)
+
+ for row in data_array:
+ job_id = row[0] or "N/A"
+ job_name = row[1] or "Unnamed"
+ sku = row[2] or "Unnamed"
+ total_dbu = float(row[4] or 0)
+ num_runs = int(row[5] or 0)
+ cost = float(row[6] or 0)
+
+ print(f"{str(job_id):<15} {job_name:<40.40} {sku:<40.40} {num_runs:>8} {total_dbu:>10.2f} ${cost:>11.2f}")
+
+
+def display_summary(data_array):
+ print("\nSUMMARY")
+ print("=" * 80)
+
+ total_jobs = len(set([row[0] for row in data_array if row[0]]))
+ total_dbu = sum([float(row[4] or 0) for row in data_array])
+ # total_runs = sum([int(row[5] or 0) for row in data_array])
+ total_cost = sum([float(row[6] or 0) for row in data_array])
+
+ print(f"Total Jobs: {total_jobs}")
+ # print(f"Total Runs: {total_runs}")
+ print(f"Total DBUs: {total_dbu:.2f}")
+ print(f"Total Cost: ${total_cost:.2f}")
+ print("=" * 80 + "\n")
+
+
+def fallback_analysis(workspace: WorkspaceClient, target_date: str):
+ """
+ Fallback method using Jobs API when system tables are unavailable.
+ Note: This doesn't provide cost data, only run information.
+ """
+ print("\nFallback: Using Jobs API for run information (costs not available)\n")
+
+ target_datetime = datetime.strptime(target_date, "%Y-%m-%d")
+ start_time_ms = int(target_datetime.timestamp() * 1000)
+ end_time_ms = int((target_datetime + timedelta(days=1)).timestamp() * 1000)
+
+ print(f"{'Job ID':<15} {'Job Name':<40} {'Status':<15} {'Start Time':<20}")
+ print("-" * 95)
+
+ job_count = 0
+ run_count = 0
+
+ try:
+ # List all jobs
+ for job in workspace.jobs.list():
+ # Get runs for this job
+ runs = workspace.jobs.list_runs(job_id=job.job_id, start_time_from=start_time_ms, start_time_to=end_time_ms)
+
+ for run in runs:
+ if run.start_time and start_time_ms <= run.start_time < end_time_ms:
+ job_name = job.settings.name if job.settings and job.settings.name else "Unnamed"
+ if len(job_name) > 38:
+ job_name = job_name[:35] + "..."
+
+ start_time = datetime.fromtimestamp(run.start_time / 1000).strftime("%Y-%m-%d %H:%M:%S")
+ status = run.state.life_cycle_state if run.state else "UNKNOWN"
+
+ print(f"{job.job_id:<15} {job_name:<40} {status:<15} {start_time:<20}")
+ run_count += 1
+
+ if runs:
+ job_count += 1
+
+ print(f"\nFound {run_count} runs across {job_count} jobs")
+ print("Note: Cost information requires system.billing.usage table access\n")
+
+ except Exception as e:
+ print(f"Error accessing Jobs API: {e}\n")
+
+
+def main():
+ """Main entry point for the script."""
+ parser = argparse.ArgumentParser(
+ description="Analyze Databricks job costs for today",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ python scripts/analyze_job_costs.py
+ python scripts/analyze_job_costs.py --profile dev
+ python scripts/analyze_job_costs.py --date 2024-01-15
+ """,
+ )
+
+ parser.add_argument("--profile", default="dev", help="Databricks CLI profile to use (default: dev)")
+
+ parser.add_argument(
+ "--date", default=get_todays_date(), help="Date to analyze in YYYY-MM-DD format (default: today)"
+ )
+
+ args = parser.parse_args()
+
+ # Validate date format
+ try:
+ datetime.strptime(args.date, "%Y-%m-%d")
+ except ValueError:
+ print(f"Error: Invalid date format '{args.date}'. Use YYYY-MM-DD")
+ return 1
+
+ # Initialize Databricks workspace client
+ try:
+ workspace = WorkspaceClient(profile=args.profile)
+ print(f"Connected to: {workspace.config.host}")
+ except Exception as e:
+ print(f"Error connecting to Databricks: {e}")
+ print(f"Ensure profile '{args.profile}' exists in ~/.databrickscfg")
+ return 1
+
+ # Run cost analysis
+ analyze_job_costs(workspace, args.date)
+
+ return 0
+
+
+if __name__ == "__main__":
+ exit(main())
diff --git a/scripts/sdk_system_tables.py b/scripts/sdk_system_tables.py
deleted file mode 100644
index 4f9b835..0000000
--- a/scripts/sdk_system_tables.py
+++ /dev/null
@@ -1,77 +0,0 @@
-from databricks.sdk import WorkspaceClient
-from databricks.sdk import AccountClient
-import requests
-
-host = None
-token = None
-
-
-def demoWorkspaceApi(w):
- for j in w.jobs.list():
- print("Job: " + str(j.job_id))
-
- for c in w.catalogs.list():
- print("Catalog: " + c.name)
- schemas = w.schemas.list(catalog_name=c.name)
- for s in schemas:
- print(" Schema: " + s.name)
-
-
-def demoAccountApi():
- a = AccountClient(profile="account")
-
- print(a)
-
- for m in a.metastores.list():
- print("Metastore: " + m.metastore_id)
- metastore = m
-
- return metastore
-
-
-def enable(system_tables, metastore):
- print("Enabling " + system_tables + " tables for " + metastore.name + " ...")
-
- update = f"{host}/api/2.0/unity-catalog/metastores/{metastore.metastore_id}/systemschemas/{system_tables}"
- response = requests.put(update, headers=token)
-
- if response.status_code == 200:
- print("OK")
- else:
- print("Failed")
- print(response.text)
-
-
-def enableSystemTables(metastore):
- enable("billing", metastore)
-
- enable("access", metastore)
-
- enable("storage", metastore)
-
- enable("compute", metastore)
-
- enable("marketplace", metastore)
-
- enable("lineage", metastore)
-
- list = f"{host}/api/2.0/unity-catalog/metastores/a238eb20-95d3-4a62-91ea-629992514227/systemschemas"
- response = requests.get(list, headers=token)
- print(response.text)
-
-
-if __name__ == "__main__":
- workspace = WorkspaceClient(
- profile="dev" # as configured in .databrickscfg
- )
-
- print(workspace)
-
- token = workspace.config.authenticate()
- host = workspace.config.host
-
- demoWorkspaceApi(workspace)
-
- metastore = demoAccountApi()
-
- enableSystemTables(metastore)
diff --git a/scripts/sdk_workspace_and_account.py b/scripts/sdk_workspace_and_account.py
new file mode 100644
index 0000000..e1fc25b
--- /dev/null
+++ b/scripts/sdk_workspace_and_account.py
@@ -0,0 +1,34 @@
+from databricks.sdk import WorkspaceClient
+from databricks.sdk import AccountClient
+import requests
+
+
+def demoWorkspaceApi():
+ workspace = WorkspaceClient(
+ profile="dev" # as configured in .databrickscfg
+ )
+
+ for j in workspace.jobs.list():
+ print("Job: " + str(j.job_id))
+
+ for c in workspace.catalogs.list():
+ print("Catalog: " + c.name)
+ schemas = workspace.schemas.list(catalog_name=c.name)
+ for s in schemas:
+ print(" Schema: " + s.name)
+
+
+def demoAccountApi():
+ # you need to run "databricks auth login -p " first...
+ a = AccountClient(profile="account1")
+
+ print(a)
+
+ for w in a.workspaces.list():
+ print("Workspace: " + w.workspace_name)
+
+
+if __name__ == "__main__":
+ demoWorkspaceApi()
+
+ demoAccountApi()