diff --git a/.github/workflows/onpush.yml b/.github/workflows/onpush.yml index 85ca194..98cf18d 100644 --- a/.github/workflows/onpush.yml +++ b/.github/workflows/onpush.yml @@ -47,7 +47,16 @@ jobs: - name: Deploy on staging run: | - make deploy-serverless env=staging + BRANCH_NAME="${{ github.head_ref || github.ref_name }}" + PR_NUMBER="${{ github.event.pull_request.number }}" + DEVELOPER="${{ github.actor }}" + + uv run python ./scripts/generate_template_workflow.py staging --serverless \ + --branch "$BRANCH_NAME" \ + --developer "$DEVELOPER" \ + $(if [ -n "$PR_NUMBER" ]; then echo "--pr-number $PR_NUMBER"; fi) + + uv run databricks bundle deploy --target staging - name: Run on staging (integration tests) run: | @@ -55,4 +64,13 @@ jobs: - name: Deploy on prod run: | - make deploy-serverless env=prod + BRANCH_NAME="${{ github.head_ref || github.ref_name }}" + PR_NUMBER="${{ github.event.pull_request.number }}" + DEVELOPER="${{ github.actor }}" + + uv run python ./scripts/generate_template_workflow.py prod --serverless \ + --branch "$BRANCH_NAME" \ + --developer "$DEVELOPER" \ + $(if [ -n "$PR_NUMBER" ]; then echo "--pr-number $PR_NUMBER"; fi) + + uv run databricks bundle deploy --target prod diff --git a/.gitignore b/.gitignore index ffe59e1..fb2a185 100644 --- a/.gitignore +++ b/.gitignore @@ -2,17 +2,14 @@ notes/ .databricks/ .vscode/ .venv/ -*.pyc -*.lock -__pycache__/ +.claude/ .pytest_cache/ dist/ build/ -output/ coverage_reports/ -.claude/ src/template.egg-info/ +CLAUDE.md +*.pyc +*.lock +.coverage* resources/workflow.yml -.coverage -.coverage.* -CLAUDE.md \ No newline at end of file diff --git a/Makefile b/Makefile index 4312ada..490a572 100644 --- a/Makefile +++ b/Makefile @@ -10,10 +10,6 @@ pre-commit: pre-commit autoupdate pre-commit run --all-files -deploy: - uv run python ./scripts/generate_template_workflow.py $(env) - uv run databricks bundle deploy --target $(env) - deploy-serverless: uv run python ./scripts/generate_template_workflow.py $(env) --serverless uv run databricks bundle deploy --target $(env) diff --git a/README.md b/README.md index 5907811..4a77e42 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ # databricks-template -> A production-ready PySpark project template with medallion architecture, Python packaging, unit tests, integration tests, CI/CD automation, Databricks Asset Bundles, and DQX data quality framework. +> A production-ready PySpark project template with medallion architecture, Python packaging, unit tests, integration tests, coverage tests, CI/CD automation, Databricks Asset Bundles, and DQX data quality framework. ![Databricks](https://img.shields.io/badge/platform-Databricks-orange?logo=databricks) -![PySpark](https://img.shields.io/badge/pyspark-4.0+-brightgreen?logo=apache-spark) +![PySpark](https://img.shields.io/badge/pyspark-4.1+-brightgreen?logo=apache-spark) ![CI/CD](https://img.shields.io/github/actions/workflow/status/andre-salvati/databricks-template/.github/workflows/onpush.yml) ![Stars](https://img.shields.io/github/stars/andre-salvati/databricks-template?style=social) @@ -18,12 +18,14 @@ Interested in bringing these principles in your own project? Let’s [connect o ## πŸ§ͺ Technologies - Databricks Free Edition (Serverless) -- Databricks Runtime 17.3 LTS -- PySpark 4.0 -- Python 3.12+ -- Unity Catalog +- Databricks Runtime 18.0 LTS - Databricks Asset Bundles - Databricks DQX +- Databricks CLI +- Databricks Python SDK +- PySpark 4.1 +- Python 3.12+ +- Unity Catalog - GitHub Actions - Pytest @@ -32,8 +34,9 @@ Interested in bringing these principles in your own project? Let’s [connect o This project template demonstrates how to: - structure PySpark code inside classes/packages. -- structure unit tests for the data transformations and set up VSCode to run them on your local machine. +- run unit tests on transformations with [pytest package](https://pypi.org/project/pytest/) - set up VSCode to run unit tests on your local machine. - structure integration tests to be executed on different environments / catalogs. +- utilize [coverage package](https://pypi.org/project/coverage/) to generate test coverage reports. - package and deploy code to different environments (dev, staging, prod) using a CI/CD pipeline with [Github Actions](https://docs.github.com/en/actions). - isolate "dev" environments / catalogs to avoid concurrency issues between developers testing jobs. - utilize [uv](https://docs.astral.sh/uv/) as a project/package manager. @@ -42,25 +45,19 @@ This project template demonstrates how to: - use [medallion architecture](https://www.databricks.com/glossary/medallion-architecture) pattern. - lint and format code with [ruff](https://docs.astral.sh/ruff/) and [pre-commit](https://pre-commit.com/). - use a Make file to automate repetitive tasks. -- utilize [pytest package](https://pypi.org/project/pytest/) to run unit tests on transformations and generate test coverage reports. - utilize [argparse package](https://pypi.org/project/argparse/) to build a flexible command line interface to start the jobs. -- utilize [funcy package](https://pypi.org/project/funcy/) to log the execution time of each transformation.
- utilize [Databricks Asset Bundles](https://docs.databricks.com/en/dev-tools/bundles/index.html) to package/deploy/run a Python wheel package on Databricks. -- utilize [Databricks DQX](https://databrickslabs.github.io/dqx/) to define and enforce data quality rules, such as null checks, uniqueness, thresholds, and schema validation. -- utilize [Databricks SDK for Python](https://docs.databricks.com/en/dev-tools/sdk-python.html) to manage workspaces and accounts. The sample script enables metastore system tables with [relevant data about billing, usage, lineage, prices, and access](https://www.youtube.com/watch?v=LcRWHzk8Wm4). +- utilize [Databricks DQX](https://databrickslabs.github.io/dqx/) to define and enforce data quality rules, such as null checks, uniqueness, thresholds, and schema validation, and filter bad data on quarantine tables. +- utilize [Databricks SDK for Python](https://docs.databricks.com/en/dev-tools/sdk-python.html) to manage workspaces and accounts and analyse costs. Refer to 'scripts' folder for some examples. - utilize [Databricks Unity Catalog](https://www.databricks.com/product/unity-catalog) and get data lineage for your tables and columns and a simplified permission model for your data. - utilize [Databricks Lakeflow Jobs](https://docs.databricks.com/en/workflows/index.html) to execute a DAG and [task parameters](https://docs.databricks.com/en/workflows/jobs/parameter-value-references.html) to share context information between tasks (see [Task Parameters section](#task-parameters)). Yes, you don't need Airflow to manage your DAGs here!!! -- **utilize serverless clusters on Databricks Free Edition to deploy your pipelines.** -- utilize [Databricks job clusters](https://docs.databricks.com/en/workflows/jobs/use-compute.html#use-databricks-compute-with-your-jobs) to reduce costs. -- define Databricks clusters on AWS and Azure. +- utilize serverless job clusters on [Databricks Free Edition](https://docs.databricks.com/aws/en/getting-started/free-edition ) to deploy your pipelines. ## 🧠 Resources -- [Goodbye Pip and Poetry. Why UV Might Be All You Need](https://codecut.ai/why-uv-might-all-you-need/) - For a debate on the use of notebooks vs. Python packaging, please refer to: - [The Rise of The Notebook Engineer](https://dataengineeringcentral.substack.com/p/the-rise-of-the-notebook-engineer) - [Please don’t make me use Databricks notebooks](https://medium.com/@seade03/please-dont-make-me-use-databricks-notebooks-3d07a4a332ae) @@ -73,7 +70,64 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle - [Deploying Databricks Asset Bundles (DABs) at Scale](https://www.youtube.com/watch?v=mMwprgB-sIU) - [A Prescription for Success: Leveraging DABs for Faster Deployment and Better Patient Outcomes](https://www.youtube.com/watch?v=01JHTM2UP-U) -## Jobs (former Workflows) +Other: +- [Goodbye Pip and Poetry. Why UV Might Be All You Need](https://codecut.ai/why-uv-might-all-you-need/) + +## πŸ“ Folder Structure + +``` +databricks-template/ +β”‚ +β”œβ”€β”€ .github/ # CI/CD automation +β”‚ └── workflows/ +β”‚ └── onpush.yml # GitHub Actions pipeline +β”‚ +β”œβ”€β”€ src/ # Main source code +β”‚ └── template/ # Python package +β”‚ β”œβ”€β”€ main.py # Entry point with CLI (argparse) +β”‚ β”œβ”€β”€ config.py # Configuration management +β”‚ β”œβ”€β”€ baseTask.py # Base class for all tasks +β”‚ β”œβ”€β”€ commonSchemas.py # Shared PySpark schemas +β”‚ └── job1/ # Job-specific tasks +β”‚ β”œβ”€β”€ extract_source1.py +β”‚ β”œβ”€β”€ extract_source2.py +β”‚ β”œβ”€β”€ generate_orders.py +β”‚ β”œβ”€β”€ generate_orders_agg.py +β”‚ β”œβ”€β”€ integration_setup.py +β”‚ └── integration_validate.py +β”‚ +β”œβ”€β”€ tests/ # Unit tests +β”‚ └── job1/ +β”‚ └── unit_test.py # Pytest unit tests +β”‚ +β”œβ”€β”€ resources/ # Databricks workflow templates +β”‚ β”œβ”€β”€ wf_template_serverless.yml # Jinja2 template for serverless +β”‚ β”œβ”€β”€ wf_template.yml # Jinja2 template for job clusters +β”‚ └── workflow.yml # Generated workflow (auto-created) +β”‚ +β”œβ”€β”€ scripts/ # Helper scripts +β”‚ β”œβ”€β”€ generate_template_workflow.py # Workflow generator (Jinja2) +β”‚ β”œβ”€β”€ sdk_analyze_job_costs.py # Cost analysis script +β”‚ └── sdk_workspace_and_account.py # Workspace and account management +β”‚ print("SUMMARY") +β”œβ”€β”€ docs/ # Documentation assets +β”‚ β”œβ”€β”€ dag.png +β”‚ β”œβ”€β”€ task_output.png +β”‚ β”œβ”€β”€ data_lineage.png +β”‚ β”œβ”€β”€ data_quality.png +β”‚ └── ci_cd.png +β”‚ +β”œβ”€β”€ dist/ # Build artifacts (Python wheel) +β”œβ”€β”€ coverage_reports/ # Test coverage reports +β”‚ +β”œβ”€β”€ databricks.yml # Databricks Asset Bundle config +β”œβ”€β”€ pyproject.toml # Python project configuration (uv) +β”œβ”€β”€ Makefile # Build automation +β”œβ”€β”€ .pre-commit-config.yaml # Pre-commit hooks (ruff) +└── README.md # This file +``` + +## Jobs
@@ -89,10 +143,11 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle
-## Data Lineage (Catalog Explorer) +## Data Lineage
+
@@ -117,47 +172,27 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle ## Instructions -### 1) Create a Databricks Workspace +1) Create a workspace. Use a [Databricks Free Edition](https://docs.databricks.com/aws/en/getting-started/free-edition) workspace. -option 1) utilize a [Databricks Free Edition](https://docs.databricks.com/aws/en/getting-started/free-edition) workspace. -option 2) create a Premium workspace. Follow instructions [here](https://github.com/databricks/terraform-databricks-examples) +2) Install and configure Databricks CLI on your local machine. Follow instructions [here](https://docs.databricks.com/en/dev-tools/cli/install.html). Check the current version on databricks.yaml. -### 2) Install and configure Databricks CLI on your local machine - -Follow the instructions [here](https://docs.databricks.com/en/dev-tools/cli/install.html) - - -### 3) Build Python env and execute unit tests on your local machine +3) Build Python env and execute unit tests on your local machine make sync & make test -You can also execute unit tests from your preferred IDE. Here's a screenshot from [VS Code](https://code.visualstudio.com/) with [Microsoft's Python extension](https://marketplace.visualstudio.com/items?itemName=ms-python.python) installed. - - -### 4) Deploy and execute on the dev workspace. - -option 1) for Databricks Free Edition use: +4) Deploy and execute on the dev workspace. make deploy-serverless env=dev - make deploy-serverless env=staging - make deploy-serverless env=prod - - -option 2) for Premium workspace: - - Update "job_clusters" properties on wf_template.yml file. There are different properties for AWS and Azure. - make deploy env=dev - make deploy env=staging - make deploy env=prod +5) configure CI/CD automation. Configure [Github Actions repository secrets](https://docs.github.com/en/actions/security-guides/using-secrets-in-github-actions) (DATABRICKS_HOST and DATABRICKS_TOKEN). -### 5) configure CI/CD automation +6) You can also execute unit tests from your preferred IDE. Here's a screenshot from [VS Code](https://code.visualstudio.com/) with [Microsoft's Python extension](https://marketplace.visualstudio.com/items?itemName=ms-python.python) installed. -Configure [Github Actions repository secrets](https://docs.github.com/en/actions/security-guides/using-secrets-in-github-actions) DATABRICKS_HOST and DATABRICKS_TOKEN. +- ## Task parameters diff --git a/databricks.yml b/databricks.yml index 6bc5d95..e3ced86 100644 --- a/databricks.yml +++ b/databricks.yml @@ -2,6 +2,7 @@ # See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. bundle: name: default_python + databricks_cli_version: ">=0.286.0" artifacts: default: diff --git a/docs/ci_cd.drawio b/docs/ci_cd.drawio index bb83077..6fe2533 100755 --- a/docs/ci_cd.drawio +++ b/docs/ci_cd.drawio @@ -1,17 +1,17 @@ - + - + - + - + @@ -21,30 +21,30 @@ - - + + - - + + - - + + - + - - + + - + @@ -55,17 +55,11 @@ - - + + - - - - - - - - + + diff --git a/docs/ci_cd.png b/docs/ci_cd.png index 8251497..7d63661 100644 Binary files a/docs/ci_cd.png and b/docs/ci_cd.png differ diff --git a/pyproject.toml b/pyproject.toml index e2bfde1..0162d41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,23 +6,23 @@ authors = [ {name = "user@company.com"} ] readme = "README.md" -requires-python = ">=3.12" +requires-python = "==3.12.*" dependencies = [ "funcy==2.0", - "databricks-labs-dqx==0.9.3", + "databricks-labs-dqx==0.12.0", ] [project.optional-dependencies] dev = [ "numpy==2.1.3", "pandas==2.2.3", - "pyarrow==19.0.1", + "pyarrow==21.0.0", "pydantic==2.10.6", "coverage==7.6.1", "pre-commit==4.0.1", "pytest==8.3.5", "pytest-cov==5.0.0", - "pyspark==4.0.0", + "pyspark==4.1.0", "jinja2==3.1.5", ] diff --git a/resources/wf_template.yml b/resources/wf_template.yml deleted file mode 100644 index 8f02ff1..0000000 --- a/resources/wf_template.yml +++ /dev/null @@ -1,95 +0,0 @@ -# The main job for default_python -resources: - jobs: - - job1: - name: template_${bundle.target} - timeout_seconds: 3600 - - {% if environment == 'prod'%} - schedule: - quartz_cron_expression: "0 0 5 * * ?" - timezone_id: "UTC" - - no_alert_for_skipped_runs: false - {% endif %} - - tasks: - - - task_key: extract_source1 - job_cluster_key: cluster-dev-aws - max_retries: 0 - python_wheel_task: - package_name: template - entry_point: main - parameters: ["--task={{'{{task.name}}'}}", - "--env=${bundle.target}", - "${var.debug}"] - libraries: - - whl: ../dist/*.whl - - - task_key: extract_source2 - job_cluster_key: cluster-dev-aws - max_retries: 0 - python_wheel_task: - package_name: template - entry_point: main - parameters: ["--task={{'{{task.name}}'}}", - "--env=${bundle.target}", - "${var.debug}"] - libraries: - - whl: ../dist/*.whl - - - task_key: generate_orders - depends_on: - - task_key: extract_source1 - - task_key: extract_source2 - job_cluster_key: cluster-dev-aws - max_retries: 0 - python_wheel_task: - package_name: template - entry_point: main - parameters: ["--task={{'{{task.name}}'}}", - "--env=${bundle.target}", - "${var.debug}"] - libraries: - - whl: ../dist/*.whl - - - task_key: generate_orders_agg - depends_on: - - task_key: generate_orders - job_cluster_key: cluster-dev-aws - max_retries: 0 - python_wheel_task: - package_name: template - entry_point: main - parameters: ["--task={{'{{task.name}}'}}", - "--env=${bundle.target}", - "${var.debug}"] - libraries: - - whl: ../dist/*.whl - - job_clusters: - # - job_cluster_key: cluster-dev-azure - # new_cluster: - # spark_version: 15.3.x-scala2.12 - # node_type_id: Standard_D8as_v5 - # num_workers: 1 - # azure_attributes: - # first_on_demand: 1 - # availability: SPOT_AZURE - # data_security_mode: SINGLE_USER - - - job_cluster_key: cluster-dev-aws - new_cluster: - spark_version: 14.2.x-scala2.12 - node_type_id: c5d.xlarge - num_workers: 1 - aws_attributes: - first_on_demand: 1 - availability: SPOT_WITH_FALLBACK - zone_id: auto - spot_bid_price_percent: 100 - ebs_volume_count: 0 - policy_id: 001934F3ABD02D4A - data_security_mode: SINGLE_USER diff --git a/resources/wf_template_serverless.yml b/resources/wf_template_serverless.yml index d30c6d3..372a7a5 100644 --- a/resources/wf_template_serverless.yml +++ b/resources/wf_template_serverless.yml @@ -6,6 +6,12 @@ resources: name: template_${bundle.target} timeout_seconds: 3600 + # Git metadata tags for traceability + tags: + git_branch: "{{ branch }}" + deployed_by: "{{ developer }}" + {% if pr_number %}pr_number: "{{ pr_number }}"{% endif %} + # A list of task execution environment specifications that can be referenced by tasks of this job. deployment: kind: BUNDLE @@ -85,6 +91,12 @@ resources: name: template_${bundle.target}_integration_test timeout_seconds: 3600 + # Git metadata tags for traceability + tags: + git_branch: "{{ branch }}" + deployed_by: "{{ developer }}" + {% if pr_number %}pr_number: "{{ pr_number }}"{% endif %} + environments: - environment_key: default spec: diff --git a/scripts/generate_template_workflow.py b/scripts/generate_template_workflow.py index 1f23555..6623dd8 100644 --- a/scripts/generate_template_workflow.py +++ b/scripts/generate_template_workflow.py @@ -1,32 +1,82 @@ import sys - +import subprocess +import argparse from jinja2 import Environment, FileSystemLoader -if len(sys.argv) not in [2, 3]: - print("Usage: python generate_workflow.py [serverless]") - print("Example: python generate_workflow.py prod") - print("Example: python generate_workflow.py prod serverless") - sys.exit(1) -serverless = len(sys.argv) == 3 and sys.argv[2].lower() == "--serverless" -print(sys.argv[2].lower()) -print(f"Serverless mode: {serverless}") +def get_git_branch(): + """Get current git branch name, fallback to 'unknown' if not available.""" + try: + branch = ( + subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL) + .decode("utf-8") + .strip() + ) + return branch + except Exception: + return "unknown" + + +def get_git_user(): + """Get git user name, fallback to 'unknown' if not available.""" + try: + user = ( + subprocess.check_output(["git", "config", "user.name"], stderr=subprocess.DEVNULL).decode("utf-8").strip() + ) + return user + except Exception: + return "unknown" + + +def main(): + parser = argparse.ArgumentParser( + description="Generate Databricks workflow YAML from Jinja2 template", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python generate_template_workflow.py dev --serverless + python generate_template_workflow.py staging --serverless --branch main --developer john --pr-number 123 + """, + ) + + parser.add_argument("environment", help="Target environment (dev, staging, prod)") + parser.add_argument("--serverless", action="store_true", help="Use serverless workflow template") + parser.add_argument("--branch", help="Git branch name (auto-detected if not provided)") + parser.add_argument("--developer", help="Developer/deployer name (auto-detected if not provided)") + parser.add_argument("--pr-number", help="Pull request number (optional)") + + args = parser.parse_args() + + # Get or auto-detect git metadata + branch = args.branch if args.branch else get_git_branch() + developer = args.developer if args.developer else get_git_user() + pr_number = args.pr_number if args.pr_number else "" + + print(f"Environment: {args.environment}") + print(f"Serverless mode: {args.serverless}") + print(f"Git branch: {branch}") + print(f"Developer: {developer}") + print(f"PR number: {pr_number if pr_number else 'N/A'}") + + # Load and render template + file_loader = FileSystemLoader(".") + env = Environment(loader=file_loader) + + if args.serverless: + template = env.get_template("/resources/wf_template_serverless.yml") + else: + template = env.get_template("/resources/wf_template.yml") -environment = sys.argv[1] + # Render the template with all variables + output = template.render(environment=args.environment, branch=branch, developer=developer, pr_number=pr_number) -file_loader = FileSystemLoader(".") -env = Environment(loader=file_loader) -if serverless: - template = env.get_template("/resources/wf_template_serverless.yml") -else: - template = env.get_template("/resources/wf_template.yml") + # Save the rendered YAML to a file + output_file = "./resources/workflow.yml" + with open(output_file, "w") as f: + f.write(output) -# Render the template with the environment variable -output = template.render(environment=environment) + print(f"\nGenerated {output_file}") -# Save the rendered YAML to a file -output_file = "./resources/workflow.yml" -with open(output_file, "w") as f: - f.write(output) -print(f"Generated {output_file}") +if __name__ == "__main__": + main() diff --git a/scripts/sdk_analyze_job_costs.py b/scripts/sdk_analyze_job_costs.py new file mode 100644 index 0000000..e39b735 --- /dev/null +++ b/scripts/sdk_analyze_job_costs.py @@ -0,0 +1,231 @@ +""" +Analyze costs of Databricks jobs run today. + +This script queries Databricks system tables to calculate costs for jobs +executed today, providing insights into compute usage and billing. + +Usage: + python scripts/analyze_job_costs.py [--profile PROFILE] [--date DATE] + +Examples: + python scripts/analyze_job_costs.py + python scripts/analyze_job_costs.py --profile dev + python scripts/analyze_job_costs.py --date 2024-01-15 +""" + +import argparse +from datetime import datetime, timedelta +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.sql import StatementState +import time + + +def get_todays_date(): + """Get today's date in YYYY-MM-DD format.""" + return datetime.now().strftime("%Y-%m-%d") + + +def analyze_job_costs(workspace: WorkspaceClient, target_date: str): + """ + Analyze job costs for a specific date using system tables. + + Args: + workspace: Databricks WorkspaceClient instance + target_date: Date to analyze in YYYY-MM-DD format + """ + print(f"\n{'=' * 80}") + print(f"Job Cost Analysis for {target_date}") + print(f"{'=' * 80}\n") + + # Query to get usage data from system tables + # Note: This assumes system tables are enabled for the metastore + query = f""" + SELECT + u.usage_metadata.job_id, + u.usage_metadata.job_name, + u.sku_name, + u.cloud, + SUM(usage_quantity) as total_dbu, + COUNT(DISTINCT u.usage_metadata.job_run_id) as num_runs, + ROUND(SUM(usage_quantity * list_prices.pricing.default), 2) as estimated_cost_usd + FROM + system.billing.usage u + LEFT JOIN + system.billing.list_prices ON u.sku_name = list_prices.sku_name + AND u.cloud = list_prices.cloud + WHERE + usage_date = '{target_date}' + AND + u.usage_metadata.job_id IS NOT NULL + GROUP BY + u.usage_metadata.job_id, + u.usage_metadata.job_name, + u.sku_name, + u.cloud + ORDER BY + estimated_cost_usd DESC + """ + + try: + print("Querying system.billing.usage table...\n") + result = workspace.statement_execution.execute_statement( + warehouse_id=get_warehouse_id(workspace), + statement=query, + wait_timeout="0s", # async + ) + + # Poll for result completion + while True: + result = workspace.statement_execution.get_statement(result.statement_id) + print("Waiting for query to complete...") + state = result.status.state + print(state) + if state in [StatementState.SUCCEEDED, StatementState.FAILED, StatementState.CANCELED]: + break + time.sleep(1) + + # print(result.result) + # print("-----") + # print(result.result.data_array) + + # Process and display results + if result.result and result.result.data_array: + display_job_costs(result.result.data_array) + display_summary(result.result.data_array) + else: + print(f"No job runs found for {target_date}") + + except Exception as e: + print(f"Error querying system tables: {e}") + fallback_analysis(workspace, target_date) + + +def get_warehouse_id(workspace: WorkspaceClient) -> str: + """Get the first available SQL warehouse ID.""" + warehouses = list(workspace.warehouses.list()) + if not warehouses: + raise ValueError("No SQL warehouse found. Please create one to query system tables.") + return warehouses[0].id + + +def display_job_costs(data_array): + print(f"{'Job ID':<15} {'Job Name':<40} {'SKU':<40} {'Runs':>8} {'DBUs':>10} {'Cost (USD)':>12}") + print("-" * 110) + + for row in data_array: + job_id = row[0] or "N/A" + job_name = row[1] or "Unnamed" + sku = row[2] or "Unnamed" + total_dbu = float(row[4] or 0) + num_runs = int(row[5] or 0) + cost = float(row[6] or 0) + + print(f"{str(job_id):<15} {job_name:<40.40} {sku:<40.40} {num_runs:>8} {total_dbu:>10.2f} ${cost:>11.2f}") + + +def display_summary(data_array): + print("\nSUMMARY") + print("=" * 80) + + total_jobs = len(set([row[0] for row in data_array if row[0]])) + total_dbu = sum([float(row[4] or 0) for row in data_array]) + # total_runs = sum([int(row[5] or 0) for row in data_array]) + total_cost = sum([float(row[6] or 0) for row in data_array]) + + print(f"Total Jobs: {total_jobs}") + # print(f"Total Runs: {total_runs}") + print(f"Total DBUs: {total_dbu:.2f}") + print(f"Total Cost: ${total_cost:.2f}") + print("=" * 80 + "\n") + + +def fallback_analysis(workspace: WorkspaceClient, target_date: str): + """ + Fallback method using Jobs API when system tables are unavailable. + Note: This doesn't provide cost data, only run information. + """ + print("\nFallback: Using Jobs API for run information (costs not available)\n") + + target_datetime = datetime.strptime(target_date, "%Y-%m-%d") + start_time_ms = int(target_datetime.timestamp() * 1000) + end_time_ms = int((target_datetime + timedelta(days=1)).timestamp() * 1000) + + print(f"{'Job ID':<15} {'Job Name':<40} {'Status':<15} {'Start Time':<20}") + print("-" * 95) + + job_count = 0 + run_count = 0 + + try: + # List all jobs + for job in workspace.jobs.list(): + # Get runs for this job + runs = workspace.jobs.list_runs(job_id=job.job_id, start_time_from=start_time_ms, start_time_to=end_time_ms) + + for run in runs: + if run.start_time and start_time_ms <= run.start_time < end_time_ms: + job_name = job.settings.name if job.settings and job.settings.name else "Unnamed" + if len(job_name) > 38: + job_name = job_name[:35] + "..." + + start_time = datetime.fromtimestamp(run.start_time / 1000).strftime("%Y-%m-%d %H:%M:%S") + status = run.state.life_cycle_state if run.state else "UNKNOWN" + + print(f"{job.job_id:<15} {job_name:<40} {status:<15} {start_time:<20}") + run_count += 1 + + if runs: + job_count += 1 + + print(f"\nFound {run_count} runs across {job_count} jobs") + print("Note: Cost information requires system.billing.usage table access\n") + + except Exception as e: + print(f"Error accessing Jobs API: {e}\n") + + +def main(): + """Main entry point for the script.""" + parser = argparse.ArgumentParser( + description="Analyze Databricks job costs for today", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python scripts/analyze_job_costs.py + python scripts/analyze_job_costs.py --profile dev + python scripts/analyze_job_costs.py --date 2024-01-15 + """, + ) + + parser.add_argument("--profile", default="dev", help="Databricks CLI profile to use (default: dev)") + + parser.add_argument( + "--date", default=get_todays_date(), help="Date to analyze in YYYY-MM-DD format (default: today)" + ) + + args = parser.parse_args() + + # Validate date format + try: + datetime.strptime(args.date, "%Y-%m-%d") + except ValueError: + print(f"Error: Invalid date format '{args.date}'. Use YYYY-MM-DD") + return 1 + + # Initialize Databricks workspace client + try: + workspace = WorkspaceClient(profile=args.profile) + print(f"Connected to: {workspace.config.host}") + except Exception as e: + print(f"Error connecting to Databricks: {e}") + print(f"Ensure profile '{args.profile}' exists in ~/.databrickscfg") + return 1 + + # Run cost analysis + analyze_job_costs(workspace, args.date) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/scripts/sdk_system_tables.py b/scripts/sdk_system_tables.py deleted file mode 100644 index 4f9b835..0000000 --- a/scripts/sdk_system_tables.py +++ /dev/null @@ -1,77 +0,0 @@ -from databricks.sdk import WorkspaceClient -from databricks.sdk import AccountClient -import requests - -host = None -token = None - - -def demoWorkspaceApi(w): - for j in w.jobs.list(): - print("Job: " + str(j.job_id)) - - for c in w.catalogs.list(): - print("Catalog: " + c.name) - schemas = w.schemas.list(catalog_name=c.name) - for s in schemas: - print(" Schema: " + s.name) - - -def demoAccountApi(): - a = AccountClient(profile="account") - - print(a) - - for m in a.metastores.list(): - print("Metastore: " + m.metastore_id) - metastore = m - - return metastore - - -def enable(system_tables, metastore): - print("Enabling " + system_tables + " tables for " + metastore.name + " ...") - - update = f"{host}/api/2.0/unity-catalog/metastores/{metastore.metastore_id}/systemschemas/{system_tables}" - response = requests.put(update, headers=token) - - if response.status_code == 200: - print("OK") - else: - print("Failed") - print(response.text) - - -def enableSystemTables(metastore): - enable("billing", metastore) - - enable("access", metastore) - - enable("storage", metastore) - - enable("compute", metastore) - - enable("marketplace", metastore) - - enable("lineage", metastore) - - list = f"{host}/api/2.0/unity-catalog/metastores/a238eb20-95d3-4a62-91ea-629992514227/systemschemas" - response = requests.get(list, headers=token) - print(response.text) - - -if __name__ == "__main__": - workspace = WorkspaceClient( - profile="dev" # as configured in .databrickscfg - ) - - print(workspace) - - token = workspace.config.authenticate() - host = workspace.config.host - - demoWorkspaceApi(workspace) - - metastore = demoAccountApi() - - enableSystemTables(metastore) diff --git a/scripts/sdk_workspace_and_account.py b/scripts/sdk_workspace_and_account.py new file mode 100644 index 0000000..e1fc25b --- /dev/null +++ b/scripts/sdk_workspace_and_account.py @@ -0,0 +1,34 @@ +from databricks.sdk import WorkspaceClient +from databricks.sdk import AccountClient +import requests + + +def demoWorkspaceApi(): + workspace = WorkspaceClient( + profile="dev" # as configured in .databrickscfg + ) + + for j in workspace.jobs.list(): + print("Job: " + str(j.job_id)) + + for c in workspace.catalogs.list(): + print("Catalog: " + c.name) + schemas = workspace.schemas.list(catalog_name=c.name) + for s in schemas: + print(" Schema: " + s.name) + + +def demoAccountApi(): + # you need to run "databricks auth login -p " first... + a = AccountClient(profile="account1") + + print(a) + + for w in a.workspaces.list(): + print("Workspace: " + w.workspace_name) + + +if __name__ == "__main__": + demoWorkspaceApi() + + demoAccountApi()